Debugging & Development Tools
Complete PySerial debugging guide - analyze communication problems, monitor data flow, and optimize serial applications with professional debugging techniques
Master PySerial debugging with professional tools and techniques - analyze communication problems, monitor data flows, and optimize serial applications for reliable operation.
Serial Communication Analyzer
Build a comprehensive analyzer that captures, displays, and analyzes all serial communication for debugging complex protocols.
import serial
import threading
import time
from datetime import datetime
from typing import Dict, List, Optional, Callable
import re
import json
class SerialProtocolAnalyzer:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial = None
self.analyzing = False
# Data storage
self.packets = []
self.statistics = {
'total_packets': 0,
'tx_packets': 0,
'rx_packets': 0,
'total_bytes': 0,
'tx_bytes': 0,
'rx_bytes': 0,
'start_time': None,
'errors': 0
}
# Protocol detection
self.protocol_patterns = {
'AT_COMMANDS': re.compile(r'^AT([+]\w+)?.*$', re.IGNORECASE),
'NMEA_GPS': re.compile(r'^\$GP\w{3},.*\*[0-9A-F]{2}$'),
'MODBUS_ASCII': re.compile(r'^:[0-9A-F]+$'),
'JSON': re.compile(r'^[\{\[].*[\}\]]$'),
'CSV': re.compile(r'^[^,]+(?:,[^,]*)+$'),
'HEX_DATA': re.compile(r'^[0-9A-F\s]+$', re.IGNORECASE),
'BINARY': re.compile(r'[\x00-\x1F\x7F-\xFF]')
}
# Callbacks
self.packet_callbacks = []
self.error_callbacks = []
def connect(self) -> bool:
"""Connect to serial port"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1, # Short timeout for analysis
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE
)
print(f"✅ Analyzer connected to {self.port} @ {self.baudrate} baud")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False
def start_analysis(self):
"""Start protocol analysis"""
if not self.serial:
return False
self.analyzing = True
self.statistics['start_time'] = datetime.now()
# Start analysis threads
self.rx_thread = threading.Thread(target=self._rx_analyzer)
self.tx_thread = threading.Thread(target=self._tx_analyzer)
self.rx_thread.daemon = True
self.tx_thread.daemon = True
self.rx_thread.start()
self.tx_thread.start()
print("🔍 Protocol analysis started")
return True
def _rx_analyzer(self):
"""Analyze received data"""
rx_buffer = bytearray()
while self.analyzing:
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
rx_buffer.extend(data)
# Process complete packets
while self._extract_packet(rx_buffer, 'RX'):
pass
except Exception as e:
self._log_error(f"RX analysis error: {e}")
time.sleep(0.001)
def _tx_analyzer(self):
"""Analyze transmitted data (if monitoring TX line)"""
# Note: This would require additional hardware or software to monitor TX
# For now, we'll simulate by intercepting write operations
pass
def _extract_packet(self, buffer: bytearray, direction: str) -> bool:
"""Extract complete packet from buffer"""
if not buffer:
return False
# Try different packet delimiters
delimiters = [b'\r\n', b'\n', b'\r', b'\0']
for delimiter in delimiters:
if delimiter in buffer:
packet_data, remaining = buffer.split(delimiter, 1)
if packet_data:
self._process_packet(packet_data, direction, delimiter)
# Update buffer
buffer.clear()
buffer.extend(remaining)
return True
# Check for binary protocols with fixed lengths or special markers
if len(buffer) > 0:
# Look for potential binary protocol markers
if buffer[0] in [0xAA, 0x55, 0xFF, 0x02, 0x03]: # Common start bytes
# Try to extract binary packet (simplified)
if len(buffer) >= 4: # Minimum binary packet
packet_length = self._guess_binary_length(buffer)
if len(buffer) >= packet_length:
packet_data = bytes(buffer[:packet_length])
self._process_packet(packet_data, direction, b'')
buffer[:packet_length] = b''
return True
return False
def _guess_binary_length(self, buffer: bytearray) -> int:
"""Guess binary packet length"""
if len(buffer) < 2:
return 0
# Common binary protocol patterns
if buffer[0] == 0xAA and len(buffer) > 2:
# Length byte at position 2
if len(buffer) > 2:
return 3 + buffer[2]
elif buffer[0] in [0x01, 0x02, 0x03, 0x04]: # Modbus function codes
if len(buffer) >= 2:
return 8 # Typical Modbus RTU frame
# Default minimum length
return min(len(buffer), 16)
def _process_packet(self, packet_data: bytes, direction: str, delimiter: bytes):
"""Process detected packet"""
timestamp = datetime.now()
# Decode packet
try:
decoded_str = packet_data.decode('utf-8', errors='ignore')
except:
decoded_str = packet_data.hex()
# Detect protocol
protocol = self._detect_protocol(packet_data, decoded_str)
# Create packet info
packet_info = {
'timestamp': timestamp,
'direction': direction,
'raw_data': packet_data,
'decoded': decoded_str,
'length': len(packet_data),
'protocol': protocol,
'delimiter': delimiter,
'analysis': self._analyze_packet_content(packet_data, decoded_str, protocol)
}
# Store packet
self.packets.append(packet_info)
# Update statistics
self.statistics['total_packets'] += 1
self.statistics['total_bytes'] += len(packet_data)
if direction == 'TX':
self.statistics['tx_packets'] += 1
self.statistics['tx_bytes'] += len(packet_data)
else:
self.statistics['rx_packets'] += 1
self.statistics['rx_bytes'] += len(packet_data)
# Trigger callbacks
for callback in self.packet_callbacks:
try:
callback(packet_info)
except Exception as e:
self._log_error(f"Callback error: {e}")
print(f"📦 {direction} [{timestamp.strftime('%H:%M:%S.%f')[:-3]}] "
f"{protocol} ({len(packet_data)} bytes): {decoded_str[:50]}...")
def _detect_protocol(self, raw_data: bytes, decoded_str: str) -> str:
"""Detect protocol type"""
# Check for binary indicators first
if any(b >= 0x80 or (b < 0x20 and b not in [0x09, 0x0A, 0x0D]) for b in raw_data):
return 'BINARY'
# Check text-based protocols
for protocol_name, pattern in self.protocol_patterns.items():
if protocol_name != 'BINARY' and pattern.search(decoded_str):
return protocol_name
# Check for common binary protocol headers
if raw_data:
first_byte = raw_data[0]
if first_byte in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x0F, 0x10]:
return 'MODBUS_RTU'
elif first_byte == 0xAA:
return 'CUSTOM_BINARY'
elif first_byte in [0x7E, 0x7D]:
return 'PPP_HDLC'
return 'UNKNOWN'
def _analyze_packet_content(self, raw_data: bytes, decoded_str: str, protocol: str) -> Dict:
"""Analyze packet content based on protocol"""
analysis = {'fields': {}, 'errors': [], 'warnings': []}
if protocol == 'AT_COMMANDS':
analysis.update(self._analyze_at_command(decoded_str))
elif protocol == 'NMEA_GPS':
analysis.update(self._analyze_nmea_sentence(decoded_str))
elif protocol == 'MODBUS_RTU':
analysis.update(self._analyze_modbus_rtu(raw_data))
elif protocol == 'JSON':
analysis.update(self._analyze_json(decoded_str))
elif protocol == 'CSV':
analysis.update(self._analyze_csv(decoded_str))
return analysis
def _analyze_at_command(self, data: str) -> Dict:
"""Analyze AT command"""
analysis = {'fields': {}, 'errors': [], 'warnings': []}
if data.upper().startswith('AT'):
if '+' in data:
# Extended AT command
parts = data.split('+', 1)
if len(parts) > 1:
cmd_part = parts[1]
if '=' in cmd_part:
cmd, params = cmd_part.split('=', 1)
analysis['fields']['command'] = cmd
analysis['fields']['parameters'] = params
else:
analysis['fields']['command'] = cmd_part
else:
analysis['fields']['command'] = 'BASIC_AT'
elif data in ['OK', 'ERROR', 'CONNECT', 'NO CARRIER']:
analysis['fields']['response_type'] = data
return analysis
def _analyze_nmea_sentence(self, data: str) -> Dict:
"""Analyze NMEA sentence"""
analysis = {'fields': {}, 'errors': [], 'warnings': []}
try:
# Validate checksum
if '*' in data:
sentence, checksum = data.split('*')
calc_checksum = 0
for char in sentence[1:]: # Skip $
calc_checksum ^= ord(char)
expected_checksum = f"{calc_checksum:02X}"
if checksum.upper() != expected_checksum:
analysis['errors'].append(f"Checksum mismatch: got {checksum}, expected {expected_checksum}")
else:
analysis['fields']['checksum_valid'] = True
# Parse sentence type
if data.startswith('$'):
parts = data[1:].split('*')[0].split(',')
if parts:
analysis['fields']['sentence_type'] = parts[0]
analysis['fields']['field_count'] = len(parts) - 1
# Specific NMEA sentence analysis
if parts[0] == 'GPGGA' and len(parts) >= 15:
analysis['fields']['fix_quality'] = parts[6]
analysis['fields']['satellites'] = parts[7]
analysis['fields']['altitude'] = parts[9]
except Exception as e:
analysis['errors'].append(f"NMEA parsing error: {e}")
return analysis
def _analyze_modbus_rtu(self, data: bytes) -> Dict:
"""Analyze Modbus RTU frame"""
analysis = {'fields': {}, 'errors': [], 'warnings': []}
if len(data) < 4:
analysis['errors'].append("Frame too short for Modbus RTU")
return analysis
slave_id = data[0]
function_code = data[1]
analysis['fields']['slave_id'] = slave_id
analysis['fields']['function_code'] = function_code
# Validate CRC if frame is complete
if len(data) >= 4:
frame_data = data[:-2]
received_crc = int.from_bytes(data[-2:], byteorder='little')
calculated_crc = self._calculate_modbus_crc(frame_data)
if received_crc != calculated_crc:
analysis['errors'].append(f"CRC mismatch: got {received_crc:04X}, expected {calculated_crc:04X}")
else:
analysis['fields']['crc_valid'] = True
# Analyze function code
function_names = {
0x01: "Read Coils",
0x02: "Read Discrete Inputs",
0x03: "Read Holding Registers",
0x04: "Read Input Registers",
0x05: "Write Single Coil",
0x06: "Write Single Register",
0x0F: "Write Multiple Coils",
0x10: "Write Multiple Registers"
}
if function_code in function_names:
analysis['fields']['function_name'] = function_names[function_code]
elif function_code & 0x80:
analysis['fields']['function_name'] = "Exception Response"
if len(data) > 2:
analysis['fields']['exception_code'] = data[2]
return analysis
def _calculate_modbus_crc(self, data: bytes) -> int:
"""Calculate Modbus RTU CRC16"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def _analyze_json(self, data: str) -> Dict:
"""Analyze JSON data"""
analysis = {'fields': {}, 'errors': [], 'warnings': []}
try:
parsed = json.loads(data)
analysis['fields']['json_valid'] = True
analysis['fields']['key_count'] = len(parsed) if isinstance(parsed, dict) else 0
analysis['fields']['data_type'] = type(parsed).__name__
except json.JSONDecodeError as e:
analysis['errors'].append(f"JSON parsing error: {e}")
return analysis
def _analyze_csv(self, data: str) -> Dict:
"""Analyze CSV data"""
analysis = {'fields': {}, 'errors': [], 'warnings': []}
fields = data.split(',')
analysis['fields']['field_count'] = len(fields)
# Check for numeric fields
numeric_count = 0
for field in fields:
try:
float(field.strip())
numeric_count += 1
except ValueError:
pass
analysis['fields']['numeric_fields'] = numeric_count
return analysis
def _log_error(self, error_msg: str):
"""Log analysis error"""
self.statistics['errors'] += 1
error_info = {
'timestamp': datetime.now(),
'message': error_msg
}
for callback in self.error_callbacks:
try:
callback(error_info)
except:
pass
print(f"❌ Analysis Error: {error_msg}")
def add_packet_callback(self, callback: Callable):
"""Add packet detection callback"""
self.packet_callbacks.append(callback)
def add_error_callback(self, callback: Callable):
"""Add error callback"""
self.error_callbacks.append(callback)
def get_protocol_summary(self) -> Dict:
"""Get protocol distribution summary"""
protocol_counts = {}
for packet in self.packets:
protocol = packet['protocol']
protocol_counts[protocol] = protocol_counts.get(protocol, 0) + 1
return protocol_counts
def get_statistics(self) -> Dict:
"""Get analysis statistics"""
stats = self.statistics.copy()
if stats['start_time']:
duration = datetime.now() - stats['start_time']
stats['duration'] = duration.total_seconds()
if stats['duration'] > 0:
stats['packets_per_second'] = stats['total_packets'] / stats['duration']
stats['bytes_per_second'] = stats['total_bytes'] / stats['duration']
stats['protocol_distribution'] = self.get_protocol_summary()
return stats
def export_analysis(self, filename: str):
"""Export analysis results"""
export_data = {
'analysis_info': {
'port': self.port,
'baudrate': self.baudrate,
'start_time': self.statistics['start_time'].isoformat() if self.statistics['start_time'] else None,
'export_time': datetime.now().isoformat()
},
'statistics': self.get_statistics(),
'packets': []
}
# Export packet data
for packet in self.packets:
packet_export = {
'timestamp': packet['timestamp'].isoformat(),
'direction': packet['direction'],
'length': packet['length'],
'protocol': packet['protocol'],
'decoded': packet['decoded'],
'analysis': packet['analysis']
}
export_data['packets'].append(packet_export)
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"📄 Analysis exported to {filename}")
def stop_analysis(self):
"""Stop protocol analysis"""
self.analyzing = False
if hasattr(self, 'rx_thread'):
self.rx_thread.join()
if hasattr(self, 'tx_thread'):
self.tx_thread.join()
print("⏹️ Protocol analysis stopped")
def close(self):
"""Close analyzer"""
self.stop_analysis()
if self.serial:
self.serial.close()
# Example usage with callbacks
def packet_handler(packet_info):
"""Handle detected packets"""
if packet_info['protocol'] == 'AT_COMMANDS':
print(f"🔔 AT Command detected: {packet_info['decoded']}")
elif len(packet_info['analysis']['errors']) > 0:
print(f"⚠️ Packet errors: {packet_info['analysis']['errors']}")
def error_handler(error_info):
"""Handle analysis errors"""
print(f"🚨 Analysis error: {error_info['message']}")
# Create and use analyzer
analyzer = SerialProtocolAnalyzer('/dev/ttyUSB0', 115200)
analyzer.add_packet_callback(packet_handler)
analyzer.add_error_callback(error_handler)
if analyzer.connect():
analyzer.start_analysis()
try:
time.sleep(60) # Analyze for 1 minute
# Show statistics
stats = analyzer.get_statistics()
print(f"\n📊 Analysis Statistics:")
print(f" Total packets: {stats['total_packets']}")
print(f" Protocol distribution: {stats['protocol_distribution']}")
print(f" Error count: {stats['errors']}")
# Export results
analyzer.export_analysis('protocol_analysis.json')
except KeyboardInterrupt:
pass
finally:
analyzer.close()
import serial
import threading
import time
from datetime import datetime
from typing import List, Optional
import curses
import queue
class SerialHexViewer:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial = None
self.running = False
# Data storage
self.data_queue = queue.Queue()
self.display_buffer = []
self.max_buffer_size = 1000
# Display settings
self.bytes_per_line = 16
self.show_ascii = True
self.show_timestamps = True
self.highlight_changes = True
def connect(self) -> bool:
"""Connect to serial port"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1
)
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def start_capture(self):
"""Start data capture"""
if not self.serial:
return False
self.running = True
self.capture_thread = threading.Thread(target=self._capture_worker)
self.capture_thread.daemon = True
self.capture_thread.start()
return True
def _capture_worker(self):
"""Background data capture"""
while self.running:
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
timestamp = datetime.now()
# Add to queue
self.data_queue.put({
'timestamp': timestamp,
'data': data,
'direction': 'RX'
})
except Exception as e:
print(f"Capture error: {e}")
time.sleep(0.001)
def format_hex_line(self, data: bytes, offset: int = 0) -> str:
"""Format bytes as hex line with ASCII"""
hex_parts = []
ascii_parts = []
for i in range(self.bytes_per_line):
if i < len(data):
byte_val = data[i]
hex_parts.append(f"{byte_val:02X}")
# ASCII representation
if 32 <= byte_val <= 126:
ascii_parts.append(chr(byte_val))
else:
ascii_parts.append('.')
else:
hex_parts.append(' ')
ascii_parts.append(' ')
hex_str = ' '.join(hex_parts)
ascii_str = ''.join(ascii_parts)
# Add separators for readability
hex_formatted = f"{hex_str[:23]} {hex_str[24:]}"
line = f"{offset:08X}: {hex_formatted}"
if self.show_ascii:
line += f" |{ascii_str}|"
return line
def run_console_viewer(self):
"""Run console-based hex viewer"""
if not self.serial:
print("Not connected to serial port")
return
self.start_capture()
try:
while self.running:
# Process queued data
while not self.data_queue.empty():
try:
item = self.data_queue.get_nowait()
self._add_to_display_buffer(item)
except queue.Empty:
break
# Display recent data
self._display_console()
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
self.stop_capture()
def run_curses_viewer(self):
"""Run curses-based interactive hex viewer"""
if not self.serial:
print("Not connected to serial port")
return
self.start_capture()
curses.wrapper(self._curses_main)
self.stop_capture()
def _curses_main(self, stdscr):
"""Main curses interface"""
# Initialize curses
curses.curs_set(0) # Hide cursor
stdscr.nodelay(1) # Non-blocking input
stdscr.timeout(100) # 100ms timeout
# Color pairs
if curses.has_colors():
curses.start_colors()
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) # RX data
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK) # TX data
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) # Timestamps
curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) # Headers
display_offset = 0
paused = False
while self.running:
# Handle keyboard input
key = stdscr.getch()
if key == ord('q') or key == ord('Q'):
break
elif key == ord(' '):
paused = not paused
elif key == ord('c') or key == ord('C'):
self.display_buffer.clear()
elif key == curses.KEY_UP:
display_offset = max(0, display_offset - 1)
elif key == curses.KEY_DOWN:
display_offset = min(len(self.display_buffer) - 1, display_offset + 1)
elif key == curses.KEY_HOME:
display_offset = 0
elif key == curses.KEY_END:
display_offset = max(0, len(self.display_buffer) - 1)
# Process new data if not paused
if not paused:
while not self.data_queue.empty():
try:
item = self.data_queue.get_nowait()
self._add_to_display_buffer(item)
except queue.Empty:
break
# Clear screen and draw
stdscr.clear()
# Draw header
height, width = stdscr.getmaxyx()
header = f"Serial Hex Viewer - {self.port} @ {self.baudrate} baud"
stdscr.addstr(0, 0, header, curses.color_pair(4) | curses.A_BOLD)
status = f"{'PAUSED' if paused else 'RUNNING'} | Lines: {len(self.display_buffer)} | "
status += "Commands: Q=Quit, SPACE=Pause, C=Clear, Arrow Keys=Scroll"
stdscr.addstr(1, 0, status[:width-1])
# Draw separator
stdscr.addstr(2, 0, "-" * (width - 1))
# Draw hex data
start_row = 3
visible_lines = height - start_row - 1
for i in range(visible_lines):
line_idx = display_offset + i
if line_idx >= len(self.display_buffer):
break
display_line = self.display_buffer[line_idx]
# Choose color based on direction
color_pair = curses.color_pair(1) if display_line['direction'] == 'RX' else curses.color_pair(2)
# Format line
if self.show_timestamps:
timestamp_str = display_line['timestamp'].strftime("%H:%M:%S.%f")[:-3]
line_text = f"[{timestamp_str}] {display_line['direction']}: {display_line['formatted']}"
else:
line_text = f"{display_line['direction']}: {display_line['formatted']}"
try:
stdscr.addstr(start_row + i, 0, line_text[:width-1], color_pair)
except curses.error:
pass # Ignore errors from drawing at screen edge
# Draw footer
footer = f"Offset: {display_offset}/{len(self.display_buffer)-1} | "
footer += f"Press 'q' to quit, SPACE to pause/resume"
try:
stdscr.addstr(height - 1, 0, footer[:width-1], curses.color_pair(3))
except curses.error:
pass
# Refresh screen
stdscr.refresh()
self.running = False
def _add_to_display_buffer(self, item):
"""Add item to display buffer"""
data = item['data']
timestamp = item['timestamp']
direction = item['direction']
# Split data into lines
offset = 0
while offset < len(data):
line_data = data[offset:offset + self.bytes_per_line]
formatted_line = self.format_hex_line(line_data, offset)
self.display_buffer.append({
'timestamp': timestamp,
'direction': direction,
'formatted': formatted_line,
'raw_data': line_data
})
offset += self.bytes_per_line
# Limit buffer size
if len(self.display_buffer) > self.max_buffer_size:
self.display_buffer = self.display_buffer[-self.max_buffer_size:]
def _display_console(self):
"""Display in console (simple version)"""
# Clear screen
print('\033[2J\033[H', end='')
# Show recent entries
recent_entries = self.display_buffer[-20:] # Last 20 lines
print(f"Serial Hex Viewer - {self.port} @ {self.baudrate} baud")
print("=" * 60)
for entry in recent_entries:
timestamp_str = entry['timestamp'].strftime("%H:%M:%S.%f")[:-3]
direction = entry['direction']
formatted = entry['formatted']
print(f"[{timestamp_str}] {direction}: {formatted}")
print(f"\nTotal lines: {len(self.display_buffer)} (Press Ctrl+C to stop)")
def send_data(self, data: bytes):
"""Send data and add to display"""
if self.serial:
self.serial.write(data)
# Add to display as TX
self.data_queue.put({
'timestamp': datetime.now(),
'data': data,
'direction': 'TX'
})
def save_capture(self, filename: str):
"""Save captured data to file"""
with open(filename, 'w') as f:
f.write(f"Serial Hex Capture - {self.port} @ {self.baudrate} baud\n")
f.write(f"Captured: {datetime.now()}\n")
f.write("=" * 70 + "\n\n")
for entry in self.display_buffer:
timestamp_str = entry['timestamp'].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
direction = entry['direction']
formatted = entry['formatted']
f.write(f"[{timestamp_str}] {direction}: {formatted}\n")
print(f"Capture saved to {filename}")
def stop_capture(self):
"""Stop data capture"""
self.running = False
if hasattr(self, 'capture_thread'):
self.capture_thread.join()
def close(self):
"""Close viewer"""
self.stop_capture()
if self.serial:
self.serial.close()
# Example usage
hex_viewer = SerialHexViewer('/dev/ttyUSB0', 115200)
if hex_viewer.connect():
print("Starting hex viewer...")
print("Choose display mode:")
print("1. Console viewer")
print("2. Interactive curses viewer")
choice = input("Enter choice (1 or 2): ")
try:
if choice == '2':
hex_viewer.run_curses_viewer()
else:
hex_viewer.run_console_viewer()
# Save capture
hex_viewer.save_capture('serial_capture.txt')
finally:
hex_viewer.close()
import serial
import threading
import time
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
import statistics
import matplotlib.pyplot as plt
import numpy as np
class SerialTimingAnalyzer:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial = None
self.analyzing = False
# Timing data storage
self.byte_timestamps = []
self.packet_timestamps = []
self.inter_byte_times = []
self.inter_packet_times = []
# Analysis results
self.timing_stats = {}
# Configuration
self.packet_timeout = 0.01 # 10ms packet timeout
self.max_data_points = 10000
def connect(self) -> bool:
"""Connect to serial port"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.001 # Very short timeout for precise timing
)
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def start_analysis(self):
"""Start timing analysis"""
if not self.serial:
return False
self.analyzing = True
self.analysis_thread = threading.Thread(target=self._timing_worker)
self.analysis_thread.daemon = True
self.analysis_thread.start()
print("⏱️ Timing analysis started")
return True
def _timing_worker(self):
"""Background timing analysis"""
current_packet_start = None
last_byte_time = None
packet_bytes = []
while self.analyzing:
try:
if self.serial.in_waiting:
# High-precision timestamp
byte_time = time.perf_counter()
byte_data = self.serial.read(1)
if byte_data:
# Record byte timestamp
self.byte_timestamps.append(byte_time)
packet_bytes.append(byte_data[0])
# Calculate inter-byte time
if last_byte_time is not None:
inter_byte_time = byte_time - last_byte_time
self.inter_byte_times.append(inter_byte_time)
last_byte_time = byte_time
# Check for packet start
if current_packet_start is None:
current_packet_start = byte_time
else:
# Check for packet end (timeout)
if (current_packet_start is not None and
last_byte_time is not None and
time.perf_counter() - last_byte_time > self.packet_timeout):
# Record packet timing
packet_duration = last_byte_time - current_packet_start
self.packet_timestamps.append({
'start_time': current_packet_start,
'end_time': last_byte_time,
'duration': packet_duration,
'byte_count': len(packet_bytes),
'bytes': packet_bytes.copy()
})
# Calculate inter-packet time
if len(self.packet_timestamps) > 1:
prev_packet = self.packet_timestamps[-2]
inter_packet_time = current_packet_start - prev_packet['end_time']
self.inter_packet_times.append(inter_packet_time)
# Reset for next packet
current_packet_start = None
packet_bytes.clear()
# Limit data points to prevent memory issues
if len(self.byte_timestamps) > self.max_data_points:
self.byte_timestamps = self.byte_timestamps[-self.max_data_points//2:]
self.inter_byte_times = self.inter_byte_times[-self.max_data_points//2:]
except Exception as e:
print(f"Timing analysis error: {e}")
time.sleep(0.0001) # Minimal sleep for high precision
def calculate_statistics(self) -> Dict:
"""Calculate timing statistics"""
stats = {}
# Inter-byte timing statistics
if self.inter_byte_times:
stats['inter_byte'] = {
'count': len(self.inter_byte_times),
'mean': statistics.mean(self.inter_byte_times) * 1000, # Convert to ms
'median': statistics.median(self.inter_byte_times) * 1000,
'min': min(self.inter_byte_times) * 1000,
'max': max(self.inter_byte_times) * 1000,
'std_dev': statistics.stdev(self.inter_byte_times) * 1000 if len(self.inter_byte_times) > 1 else 0
}
# Inter-packet timing statistics
if self.inter_packet_times:
stats['inter_packet'] = {
'count': len(self.inter_packet_times),
'mean': statistics.mean(self.inter_packet_times) * 1000,
'median': statistics.median(self.inter_packet_times) * 1000,
'min': min(self.inter_packet_times) * 1000,
'max': max(self.inter_packet_times) * 1000,
'std_dev': statistics.stdev(self.inter_packet_times) * 1000 if len(self.inter_packet_times) > 1 else 0
}
# Packet statistics
if self.packet_timestamps:
packet_durations = [p['duration'] for p in self.packet_timestamps]
packet_sizes = [p['byte_count'] for p in self.packet_timestamps]
stats['packets'] = {
'count': len(self.packet_timestamps),
'duration_mean': statistics.mean(packet_durations) * 1000,
'duration_min': min(packet_durations) * 1000,
'duration_max': max(packet_durations) * 1000,
'size_mean': statistics.mean(packet_sizes),
'size_min': min(packet_sizes),
'size_max': max(packet_sizes)
}
# Calculate theoretical vs actual timing
theoretical_bit_time = 1.0 / self.baudrate
theoretical_byte_time = theoretical_bit_time * 10 # 8 data + 1 start + 1 stop
stats['theoretical'] = {
'bit_time_us': theoretical_bit_time * 1000000,
'byte_time_ms': theoretical_byte_time * 1000,
'bytes_per_second': 1.0 / theoretical_byte_time
}
# Compare with actual measurements
if self.inter_byte_times:
actual_byte_time = statistics.mean(self.inter_byte_times)
stats['comparison'] = {
'theoretical_byte_time_ms': theoretical_byte_time * 1000,
'actual_byte_time_ms': actual_byte_time * 1000,
'timing_accuracy': (1 - abs(actual_byte_time - theoretical_byte_time) / theoretical_byte_time) * 100
}
self.timing_stats = stats
return stats
def detect_timing_anomalies(self, threshold_multiplier: float = 3.0) -> List[Dict]:
"""Detect timing anomalies"""
anomalies = []
if not self.inter_byte_times:
return anomalies
# Calculate threshold based on statistics
mean_time = statistics.mean(self.inter_byte_times)
std_dev = statistics.stdev(self.inter_byte_times) if len(self.inter_byte_times) > 1 else 0
threshold = mean_time + (threshold_multiplier * std_dev)
# Find anomalies
for i, inter_time in enumerate(self.inter_byte_times):
if inter_time > threshold:
anomalies.append({
'index': i,
'time': inter_time * 1000, # Convert to ms
'threshold': threshold * 1000,
'ratio': inter_time / mean_time,
'timestamp': self.byte_timestamps[i + 1] if i + 1 < len(self.byte_timestamps) else None
})
return anomalies
def analyze_jitter(self) -> Dict:
"""Analyze timing jitter"""
if not self.inter_byte_times:
return {}
times_ms = [t * 1000 for t in self.inter_byte_times]
# Calculate jitter metrics
mean_time = statistics.mean(times_ms)
deviations = [abs(t - mean_time) for t in times_ms]
jitter_analysis = {
'mean_deviation': statistics.mean(deviations),
'max_deviation': max(deviations),
'rms_jitter': np.sqrt(np.mean([d**2 for d in deviations])),
'peak_to_peak_jitter': max(times_ms) - min(times_ms),
'coefficient_of_variation': (statistics.stdev(times_ms) / mean_time) * 100
}
return jitter_analysis
def create_timing_plots(self, save_plots: bool = True):
"""Create timing analysis plots"""
if not self.inter_byte_times and not self.inter_packet_times:
print("No timing data to plot")
return
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(f'Serial Timing Analysis - {self.port} @ {self.baudrate} baud')
# Inter-byte timing histogram
if self.inter_byte_times:
times_ms = [t * 1000 for t in self.inter_byte_times]
axes[0, 0].hist(times_ms, bins=50, alpha=0.7, edgecolor='black')
axes[0, 0].set_title('Inter-byte Timing Distribution')
axes[0, 0].set_xlabel('Time (ms)')
axes[0, 0].set_ylabel('Count')
axes[0, 0].grid(True, alpha=0.3)
# Add theoretical line
theoretical_ms = (1.0 / self.baudrate) * 10 * 1000
axes[0, 0].axvline(theoretical_ms, color='red', linestyle='--',
label=f'Theoretical: {theoretical_ms:.3f}ms')
axes[0, 0].legend()
# Inter-byte timing over time
if self.inter_byte_times:
times_ms = [t * 1000 for t in self.inter_byte_times[-1000:]] # Last 1000 points
axes[0, 1].plot(times_ms, alpha=0.7)
axes[0, 1].set_title('Inter-byte Timing Over Time')
axes[0, 1].set_xlabel('Sample Number')
axes[0, 1].set_ylabel('Time (ms)')
axes[0, 1].grid(True, alpha=0.3)
# Packet timing
if self.packet_timestamps:
durations_ms = [p['duration'] * 1000 for p in self.packet_timestamps]
sizes = [p['byte_count'] for p in self.packet_timestamps]
axes[1, 0].scatter(sizes, durations_ms, alpha=0.6)
axes[1, 0].set_title('Packet Duration vs Size')
axes[1, 0].set_xlabel('Packet Size (bytes)')
axes[1, 0].set_ylabel('Duration (ms)')
axes[1, 0].grid(True, alpha=0.3)
# Inter-packet timing
if self.inter_packet_times:
times_ms = [t * 1000 for t in self.inter_packet_times]
axes[1, 1].hist(times_ms, bins=50, alpha=0.7, edgecolor='black')
axes[1, 1].set_title('Inter-packet Timing Distribution')
axes[1, 1].set_xlabel('Time (ms)')
axes[1, 1].set_ylabel('Count')
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
if save_plots:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'timing_analysis_{timestamp}.png'
plt.savefig(filename, dpi=300, bbox_inches='tight')
print(f"📊 Timing plots saved to {filename}")
plt.show()
def generate_timing_report(self) -> str:
"""Generate comprehensive timing report"""
stats = self.calculate_statistics()
anomalies = self.detect_timing_anomalies()
jitter = self.analyze_jitter()
report = []
report.append("Serial Timing Analysis Report")
report.append("=" * 50)
report.append(f"Port: {self.port}")
report.append(f"Baudrate: {self.baudrate}")
report.append(f"Analysis Time: {datetime.now()}")
report.append("")
# Theoretical timing
if 'theoretical' in stats:
report.append("Theoretical Timing:")
report.append(f" Bit time: {stats['theoretical']['bit_time_us']:.2f} μs")
report.append(f" Byte time: {stats['theoretical']['byte_time_ms']:.3f} ms")
report.append(f" Max bytes/sec: {stats['theoretical']['bytes_per_second']:.0f}")
report.append("")
# Inter-byte timing
if 'inter_byte' in stats:
ib = stats['inter_byte']
report.append("Inter-byte Timing:")
report.append(f" Samples: {ib['count']}")
report.append(f" Mean: {ib['mean']:.3f} ms")
report.append(f" Median: {ib['median']:.3f} ms")
report.append(f" Min: {ib['min']:.3f} ms")
report.append(f" Max: {ib['max']:.3f} ms")
report.append(f" Std Dev: {ib['std_dev']:.3f} ms")
report.append("")
# Timing accuracy
if 'comparison' in stats:
comp = stats['comparison']
report.append("Timing Accuracy:")
report.append(f" Theoretical: {comp['theoretical_byte_time_ms']:.3f} ms")
report.append(f" Actual: {comp['actual_byte_time_ms']:.3f} ms")
report.append(f" Accuracy: {comp['timing_accuracy']:.1f}%")
report.append("")
# Jitter analysis
if jitter:
report.append("Jitter Analysis:")
report.append(f" Mean deviation: {jitter['mean_deviation']:.3f} ms")
report.append(f" Max deviation: {jitter['max_deviation']:.3f} ms")
report.append(f" RMS jitter: {jitter['rms_jitter']:.3f} ms")
report.append(f" Peak-to-peak: {jitter['peak_to_peak_jitter']:.3f} ms")
report.append(f" Coefficient of variation: {jitter['coefficient_of_variation']:.2f}%")
report.append("")
# Anomalies
report.append(f"Timing Anomalies: {len(anomalies)}")
if anomalies:
report.append(" (Showing first 10)")
for i, anomaly in enumerate(anomalies[:10]):
report.append(f" {i+1}. {anomaly['time']:.3f} ms (ratio: {anomaly['ratio']:.1f}x)")
report.append("")
# Packet statistics
if 'packets' in stats:
pkt = stats['packets']
report.append("Packet Statistics:")
report.append(f" Total packets: {pkt['count']}")
report.append(f" Mean duration: {pkt['duration_mean']:.3f} ms")
report.append(f" Mean size: {pkt['size_mean']:.1f} bytes")
report.append("")
return "\n".join(report)
def stop_analysis(self):
"""Stop timing analysis"""
self.analyzing = False
if hasattr(self, 'analysis_thread'):
self.analysis_thread.join()
print("⏱️ Timing analysis stopped")
def close(self):
"""Close analyzer"""
self.stop_analysis()
if self.serial:
self.serial.close()
# Example usage
timing_analyzer = SerialTimingAnalyzer('/dev/ttyUSB0', 115200)
if timing_analyzer.connect():
timing_analyzer.start_analysis()
try:
print("Analyzing timing for 30 seconds...")
time.sleep(30)
# Generate report
report = timing_analyzer.generate_timing_report()
print(report)
# Create plots
timing_analyzer.create_timing_plots(save_plots=True)
# Save report
with open('timing_report.txt', 'w') as f:
f.write(report)
print("📄 Report saved to timing_report.txt")
except KeyboardInterrupt:
pass
finally:
timing_analyzer.close()
Development Utilities
Serial Port Monitor
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
import json
import queue
import re
class SerialPortMonitor:
def __init__(self):
self.root = tk.Tk()
self.root.title("PySerial Development Monitor")
self.root.geometry("1200x800")
# Serial connection
self.serial = None
self.connected = tk.BooleanVar(value=False)
self.monitoring = tk.BooleanVar(value=False)
# Data handling
self.data_queue = queue.Queue()
self.auto_scroll = tk.BooleanVar(value=True)
self.display_hex = tk.BooleanVar(value=False)
self.display_timestamps = tk.BooleanVar(value=True)
self.log_to_file = tk.BooleanVar(value=False)
self.log_file = None
# Statistics
self.stats = {
'bytes_received': 0,
'bytes_sent': 0,
'lines_received': 0,
'start_time': None
}
# Create interface
self._create_interface()
# Start data processor
self.data_processor = threading.Thread(target=self._process_data_queue)
self.data_processor.daemon = True
self.data_processor.start()
# Update GUI periodically
self._update_gui()
def _create_interface(self):
"""Create the GUI interface"""
# Main notebook
notebook = ttk.Notebook(self.root)
notebook.pack(fill='both', expand=True, padx=5, pady=5)
# Monitor tab
monitor_frame = ttk.Frame(notebook)
notebook.add(monitor_frame, text="Monitor")
self._create_monitor_tab(monitor_frame)
# Send tab
send_frame = ttk.Frame(notebook)
notebook.add(send_frame, text="Send Data")
self._create_send_tab(send_frame)
# Statistics tab
stats_frame = ttk.Frame(notebook)
notebook.add(stats_frame, text="Statistics")
self._create_statistics_tab(stats_frame)
# Settings tab
settings_frame = ttk.Frame(notebook)
notebook.add(settings_frame, text="Settings")
self._create_settings_tab(settings_frame)
def _create_monitor_tab(self, parent):
"""Create monitor tab"""
# Connection frame
conn_frame = ttk.LabelFrame(parent, text="Connection")
conn_frame.pack(fill='x', padx=5, pady=5)
# Port selection
ttk.Label(conn_frame, text="Port:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
self.port_var = tk.StringVar()
self.port_combo = ttk.Combobox(conn_frame, textvariable=self.port_var, width=15)
self.port_combo.grid(row=0, column=1, padx=5, pady=5)
ttk.Button(conn_frame, text="Refresh", command=self._refresh_ports).grid(row=0, column=2, padx=5)
# Baudrate selection
ttk.Label(conn_frame, text="Baudrate:").grid(row=0, column=3, padx=5, pady=5, sticky='w')
self.baudrate_var = tk.StringVar(value="9600")
baudrate_combo = ttk.Combobox(conn_frame, textvariable=self.baudrate_var,
values=["1200", "2400", "4800", "9600", "19200", "38400", "57600", "115200"],
width=10)
baudrate_combo.grid(row=0, column=4, padx=5, pady=5)
# Connection buttons
ttk.Button(conn_frame, text="Connect", command=self._connect).grid(row=0, column=5, padx=5)
ttk.Button(conn_frame, text="Disconnect", command=self._disconnect).grid(row=0, column=6, padx=5)
# Status
self.status_label = ttk.Label(conn_frame, text="Disconnected")
self.status_label.grid(row=0, column=7, padx=20, pady=5)
# Control frame
control_frame = ttk.LabelFrame(parent, text="Control")
control_frame.pack(fill='x', padx=5, pady=5)
ttk.Button(control_frame, text="Start Monitor", command=self._start_monitoring).pack(side='left', padx=5)
ttk.Button(control_frame, text="Stop Monitor", command=self._stop_monitoring).pack(side='left', padx=5)
ttk.Button(control_frame, text="Clear Display", command=self._clear_display).pack(side='left', padx=5)
ttk.Button(control_frame, text="Save Log", command=self._save_log).pack(side='left', padx=5)
# Display options
ttk.Checkbutton(control_frame, text="Auto Scroll", variable=self.auto_scroll).pack(side='right', padx=5)
ttk.Checkbutton(control_frame, text="Show Hex", variable=self.display_hex).pack(side='right', padx=5)
ttk.Checkbutton(control_frame, text="Timestamps", variable=self.display_timestamps).pack(side='right', padx=5)
# Data display
display_frame = ttk.LabelFrame(parent, text="Data")
display_frame.pack(fill='both', expand=True, padx=5, pady=5)
self.data_text = scrolledtext.ScrolledText(display_frame, wrap=tk.WORD, font=('Consolas', 10))
self.data_text.pack(fill='both', expand=True, padx=5, pady=5)
# Configure text tags for coloring
self.data_text.tag_configure('rx', foreground='blue')
self.data_text.tag_configure('tx', foreground='red')
self.data_text.tag_configure('timestamp', foreground='gray')
self.data_text.tag_configure('hex', foreground='green')
def _create_send_tab(self, parent):
"""Create send data tab"""
# Send frame
send_frame = ttk.LabelFrame(parent, text="Send Data")
send_frame.pack(fill='x', padx=5, pady=5)
# Single line send
ttk.Label(send_frame, text="Send Text:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
self.send_text_var = tk.StringVar()
send_entry = ttk.Entry(send_frame, textvariable=self.send_text_var, width=50)
send_entry.grid(row=0, column=1, padx=5, pady=5, sticky='ew')
send_entry.bind('<Return>', lambda e: self._send_text())
ttk.Button(send_frame, text="Send", command=self._send_text).grid(row=0, column=2, padx=5)
# Line ending options
ttk.Label(send_frame, text="Line Ending:").grid(row=1, column=0, padx=5, pady=5, sticky='w')
self.line_ending_var = tk.StringVar(value="\\r\\n")
ending_combo = ttk.Combobox(send_frame, textvariable=self.line_ending_var,
values=["None", "\\n", "\\r", "\\r\\n"], width=10)
ending_combo.grid(row=1, column=1, padx=5, pady=5, sticky='w')
send_frame.columnconfigure(1, weight=1)
# Multi-line send
multi_frame = ttk.LabelFrame(parent, text="Multi-line Send")
multi_frame.pack(fill='both', expand=True, padx=5, pady=5)
self.multi_text = scrolledtext.ScrolledText(multi_frame, height=10, font=('Consolas', 10))
self.multi_text.pack(fill='both', expand=True, padx=5, pady=5)
# Multi-line controls
multi_control = ttk.Frame(multi_frame)
multi_control.pack(fill='x', padx=5, pady=5)
ttk.Button(multi_control, text="Send All", command=self._send_multi_text).pack(side='left', padx=5)
ttk.Button(multi_control, text="Clear", command=lambda: self.multi_text.delete(1.0, tk.END)).pack(side='left', padx=5)
ttk.Button(multi_control, text="Load File", command=self._load_send_file).pack(side='left', padx=5)
# Delay option for multi-line
ttk.Label(multi_control, text="Line Delay (ms):").pack(side='right', padx=5)
self.line_delay_var = tk.StringVar(value="100")
ttk.Entry(multi_control, textvariable=self.line_delay_var, width=10).pack(side='right', padx=5)
# Hex send
hex_frame = ttk.LabelFrame(parent, text="Send Hex")
hex_frame.pack(fill='x', padx=5, pady=5)
ttk.Label(hex_frame, text="Hex Data:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
self.hex_var = tk.StringVar()
hex_entry = ttk.Entry(hex_frame, textvariable=self.hex_var, width=50)
hex_entry.grid(row=0, column=1, padx=5, pady=5, sticky='ew')
ttk.Button(hex_frame, text="Send Hex", command=self._send_hex).grid(row=0, column=2, padx=5)
hex_frame.columnconfigure(1, weight=1)
def _create_statistics_tab(self, parent):
"""Create statistics tab"""
stats_frame = ttk.LabelFrame(parent, text="Communication Statistics")
stats_frame.pack(fill='both', expand=True, padx=5, pady=5)
self.stats_text = scrolledtext.ScrolledText(stats_frame, height=20, font=('Consolas', 10))
self.stats_text.pack(fill='both', expand=True, padx=5, pady=5)
# Control buttons
button_frame = ttk.Frame(stats_frame)
button_frame.pack(fill='x', padx=5, pady=5)
ttk.Button(button_frame, text="Update Stats", command=self._update_statistics).pack(side='left', padx=5)
ttk.Button(button_frame, text="Reset Stats", command=self._reset_statistics).pack(side='left', padx=5)
ttk.Button(button_frame, text="Export Stats", command=self._export_statistics).pack(side='left', padx=5)
def _create_settings_tab(self, parent):
"""Create settings tab"""
# Display settings
display_frame = ttk.LabelFrame(parent, text="Display Settings")
display_frame.pack(fill='x', padx=5, pady=5)
ttk.Checkbutton(display_frame, text="Log to File", variable=self.log_to_file).pack(anchor='w', padx=5, pady=2)
# Font settings
font_frame = ttk.Frame(display_frame)
font_frame.pack(fill='x', padx=5, pady=5)
ttk.Label(font_frame, text="Font Size:").pack(side='left', padx=5)
self.font_size_var = tk.StringVar(value="10")
font_spin = tk.Spinbox(font_frame, from_=8, to=20, width=5, textvariable=self.font_size_var)
font_spin.pack(side='left', padx=5)
ttk.Button(font_frame, text="Apply", command=self._update_font).pack(side='left', padx=5)
# Buffer settings
buffer_frame = ttk.LabelFrame(parent, text="Buffer Settings")
buffer_frame.pack(fill='x', padx=5, pady=5)
ttk.Label(buffer_frame, text="Max Display Lines:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
self.max_lines_var = tk.StringVar(value="1000")
ttk.Entry(buffer_frame, textvariable=self.max_lines_var, width=10).grid(row=0, column=1, padx=5, pady=5)
# Advanced settings
advanced_frame = ttk.LabelFrame(parent, text="Advanced Settings")
advanced_frame.pack(fill='x', padx=5, pady=5)
ttk.Label(advanced_frame, text="Data Bits:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
self.databits_var = tk.StringVar(value="8")
ttk.Combobox(advanced_frame, textvariable=self.databits_var, values=["5", "6", "7", "8"], width=5).grid(row=0, column=1, padx=5, pady=5)
ttk.Label(advanced_frame, text="Parity:").grid(row=0, column=2, padx=5, pady=5, sticky='w')
self.parity_var = tk.StringVar(value="None")
ttk.Combobox(advanced_frame, textvariable=self.parity_var, values=["None", "Even", "Odd"], width=8).grid(row=0, column=3, padx=5, pady=5)
ttk.Label(advanced_frame, text="Stop Bits:").grid(row=1, column=0, padx=5, pady=5, sticky='w')
self.stopbits_var = tk.StringVar(value="1")
ttk.Combobox(advanced_frame, textvariable=self.stopbits_var, values=["1", "1.5", "2"], width=5).grid(row=1, column=1, padx=5, pady=5)
def _refresh_ports(self):
"""Refresh available ports"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
self.port_combo['values'] = port_list
if port_list and not self.port_var.get():
self.port_var.set(port_list[0])
def _connect(self):
"""Connect to serial port"""
if self.connected.get():
return
port = self.port_var.get()
if not port:
messagebox.showerror("Error", "Please select a port")
return
try:
# Parse settings
baudrate = int(self.baudrate_var.get())
databits_map = {"5": 5, "6": 6, "7": 7, "8": 8}
databits = databits_map[self.databits_var.get()]
parity_map = {"None": serial.PARITY_NONE, "Even": serial.PARITY_EVEN, "Odd": serial.PARITY_ODD}
parity = parity_map[self.parity_var.get()]
stopbits_map = {"1": serial.STOPBITS_ONE, "1.5": serial.STOPBITS_ONE_POINT_FIVE, "2": serial.STOPBITS_TWO}
stopbits = stopbits_map[self.stopbits_var.get()]
# Connect
self.serial = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=databits,
parity=parity,
stopbits=stopbits,
timeout=0.1
)
self.connected.set(True)
self.status_label.config(text=f"Connected to {port}")
# Reset statistics
self._reset_statistics()
except Exception as e:
messagebox.showerror("Connection Error", f"Failed to connect: {e}")
def _disconnect(self):
"""Disconnect from serial port"""
if self.serial:
self._stop_monitoring()
self.serial.close()
self.serial = None
self.connected.set(False)
self.status_label.config(text="Disconnected")
def _start_monitoring(self):
"""Start monitoring serial data"""
if not self.connected.get():
messagebox.showerror("Error", "Not connected to a port")
return
self.monitoring.set(True)
self.stats['start_time'] = datetime.now()
# Start monitor thread
self.monitor_thread = threading.Thread(target=self._monitor_worker)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _stop_monitoring(self):
"""Stop monitoring"""
self.monitoring.set(False)
def _monitor_worker(self):
"""Background monitoring worker"""
while self.monitoring.get() and self.connected.get():
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
# Add to queue for GUI processing
self.data_queue.put({
'timestamp': datetime.now(),
'direction': 'RX',
'data': data
})
self.stats['bytes_received'] += len(data)
except Exception as e:
print(f"Monitor error: {e}")
time.sleep(0.01)
def _process_data_queue(self):
"""Process data queue for display"""
while True:
try:
item = self.data_queue.get(timeout=0.1)
self.root.after(0, self._display_data, item)
except queue.Empty:
continue
except Exception as e:
print(f"Data processing error: {e}")
def _display_data(self, item):
"""Display data in GUI"""
timestamp = item['timestamp']
direction = item['direction']
data = item['data']
# Format display text
display_text = ""
if self.display_timestamps.get():
ts_str = timestamp.strftime("%H:%M:%S.%f")[:-3]
display_text += f"[{ts_str}] "
display_text += f"{direction}: "
if self.display_hex.get():
# Display as hex
hex_str = data.hex().upper()
# Add spaces between bytes
hex_formatted = ' '.join(hex_str[i:i+2] for i in range(0, len(hex_str), 2))
display_text += hex_formatted
else:
# Display as text
text_str = data.decode('utf-8', errors='ignore')
display_text += repr(text_str)
display_text += "\n"
# Insert with appropriate tags
start_pos = self.data_text.index(tk.END)
self.data_text.insert(tk.END, display_text)
end_pos = self.data_text.index(tk.END)
# Apply tags
if direction == 'RX':
self.data_text.tag_add('rx', start_pos, end_pos)
else:
self.data_text.tag_add('tx', start_pos, end_pos)
# Limit buffer size
max_lines = int(self.max_lines_var.get())
lines = int(self.data_text.index('end-1c').split('.')[0])
if lines > max_lines:
self.data_text.delete('1.0', f'{lines - max_lines}.0')
# Auto scroll
if self.auto_scroll.get():
self.data_text.see(tk.END)
# Log to file
if self.log_to_file.get() and not self.log_file:
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
self.log_file = open(f'serial_log_{timestamp_str}.txt', 'w')
if self.log_file:
self.log_file.write(display_text)
self.log_file.flush()
def _send_text(self):
"""Send text data"""
if not self.connected.get():
messagebox.showerror("Error", "Not connected")
return
text = self.send_text_var.get()
if not text:
return
# Add line ending
ending = self.line_ending_var.get()
if ending != "None":
ending = ending.replace('\\n', '\n').replace('\\r', '\r')
text += ending
# Send data
data = text.encode('utf-8')
self.serial.write(data)
# Add to display
self.data_queue.put({
'timestamp': datetime.now(),
'direction': 'TX',
'data': data
})
self.stats['bytes_sent'] += len(data)
self.send_text_var.set("") # Clear input
def _send_multi_text(self):
"""Send multi-line text"""
if not self.connected.get():
messagebox.showerror("Error", "Not connected")
return
text = self.multi_text.get(1.0, tk.END).strip()
if not text:
return
lines = text.split('\n')
delay = int(self.line_delay_var.get()) / 1000.0
def send_worker():
for line in lines:
if line.strip():
self._send_line_with_ending(line)
time.sleep(delay)
threading.Thread(target=send_worker, daemon=True).start()
def _send_line_with_ending(self, line):
"""Send single line with line ending"""
ending = self.line_ending_var.get()
if ending != "None":
ending = ending.replace('\\n', '\n').replace('\\r', '\r')
line += ending
data = line.encode('utf-8')
self.serial.write(data)
self.data_queue.put({
'timestamp': datetime.now(),
'direction': 'TX',
'data': data
})
self.stats['bytes_sent'] += len(data)
def _send_hex(self):
"""Send hex data"""
if not self.connected.get():
messagebox.showerror("Error", "Not connected")
return
hex_str = self.hex_var.get().replace(' ', '').replace('0x', '')
try:
data = bytes.fromhex(hex_str)
self.serial.write(data)
self.data_queue.put({
'timestamp': datetime.now(),
'direction': 'TX',
'data': data
})
self.stats['bytes_sent'] += len(data)
self.hex_var.set("")
except ValueError:
messagebox.showerror("Error", "Invalid hex data")
def _clear_display(self):
"""Clear the display"""
self.data_text.delete(1.0, tk.END)
def _save_log(self):
"""Save log to file"""
filename = filedialog.asksaveasfilename(
defaultextension=".txt",
filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
)
if filename:
try:
with open(filename, 'w') as f:
f.write(self.data_text.get(1.0, tk.END))
messagebox.showinfo("Success", f"Log saved to {filename}")
except Exception as e:
messagebox.showerror("Error", f"Failed to save: {e}")
def _load_send_file(self):
"""Load file to send"""
filename = filedialog.askopenfilename(
filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
)
if filename:
try:
with open(filename, 'r') as f:
content = f.read()
self.multi_text.delete(1.0, tk.END)
self.multi_text.insert(1.0, content)
except Exception as e:
messagebox.showerror("Error", f"Failed to load: {e}")
def _update_font(self):
"""Update font size"""
try:
size = int(self.font_size_var.get())
font = ('Consolas', size)
self.data_text.config(font=font)
self.multi_text.config(font=font)
except ValueError:
pass
def _update_statistics(self):
"""Update statistics display"""
stats_text = "Communication Statistics\n"
stats_text += "=" * 30 + "\n\n"
if self.stats['start_time']:
duration = datetime.now() - self.stats['start_time']
stats_text += f"Session Duration: {duration}\n"
if duration.total_seconds() > 0:
rx_rate = self.stats['bytes_received'] / duration.total_seconds()
tx_rate = self.stats['bytes_sent'] / duration.total_seconds()
stats_text += f"RX Rate: {rx_rate:.1f} bytes/sec\n"
stats_text += f"TX Rate: {tx_rate:.1f} bytes/sec\n"
stats_text += f"\nBytes Received: {self.stats['bytes_received']:,}\n"
stats_text += f"Bytes Sent: {self.stats['bytes_sent']:,}\n"
stats_text += f"Total Bytes: {self.stats['bytes_received'] + self.stats['bytes_sent']:,}\n"
# Connection info
if self.connected.get() and self.serial:
stats_text += f"\nConnection Info:\n"
stats_text += f"Port: {self.serial.port}\n"
stats_text += f"Baudrate: {self.serial.baudrate}\n"
stats_text += f"Data Bits: {self.serial.bytesize}\n"
stats_text += f"Parity: {self.serial.parity}\n"
stats_text += f"Stop Bits: {self.serial.stopbits}\n"
self.stats_text.delete(1.0, tk.END)
self.stats_text.insert(1.0, stats_text)
def _reset_statistics(self):
"""Reset statistics"""
self.stats = {
'bytes_received': 0,
'bytes_sent': 0,
'lines_received': 0,
'start_time': datetime.now() if self.connected.get() else None
}
self._update_statistics()
def _export_statistics(self):
"""Export statistics to JSON"""
filename = filedialog.asksaveasfilename(
defaultextension=".json",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
)
if filename:
try:
export_data = self.stats.copy()
if export_data['start_time']:
export_data['start_time'] = export_data['start_time'].isoformat()
export_data['export_time'] = datetime.now().isoformat()
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
messagebox.showinfo("Success", f"Statistics exported to {filename}")
except Exception as e:
messagebox.showerror("Error", f"Failed to export: {e}")
def _update_gui(self):
"""Update GUI periodically"""
# Auto-update statistics
if self.connected.get() and self.monitoring.get():
self._update_statistics()
# Schedule next update
self.root.after(5000, self._update_gui) # Update every 5 seconds
def run(self):
"""Run the monitor"""
# Initialize
self._refresh_ports()
# Start main loop
try:
self.root.mainloop()
finally:
# Cleanup
if self.log_file:
self.log_file.close()
if self.serial:
self.serial.close()
# Run the monitor
if __name__ == "__main__":
monitor = SerialPortMonitor()
monitor.run()
Protocol Simulator
import serial
import threading
import time
from datetime import datetime
from typing import Dict, List, Callable, Any
import random
import json
class ProtocolSimulator:
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.serial = None
self.running = False
# Protocol handlers
self.handlers = {}
self.response_delays = {}
# Default handlers
self._setup_default_handlers()
# Statistics
self.stats = {
'commands_processed': 0,
'responses_sent': 0,
'errors': 0,
'start_time': None
}
def _setup_default_handlers(self):
"""Setup default protocol handlers"""
# AT Command handler
self.add_handler('AT.*', self._handle_at_command, delay_range=(0.1, 0.5))
# JSON handler
self.add_handler(r'\{.*\}', self._handle_json, delay_range=(0.05, 0.2))
# Modbus RTU handler (simplified)
self.add_handler(r'[\x01-\xF7][\x01-\x10].*', self._handle_modbus_rtu, delay_range=(0.01, 0.05))
# NMEA handler
self.add_handler(r'\$.*\*[0-9A-F]{2}', self._handle_nmea, delay_range=(0.1, 1.0))
def connect(self) -> bool:
"""Connect to serial port"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1
)
print(f"✅ Protocol simulator connected to {self.port}")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False
def add_handler(self, pattern: str, handler: Callable, delay_range: tuple = (0.01, 0.1)):
"""Add protocol handler"""
import re
self.handlers[re.compile(pattern)] = handler
self.response_delays[handler] = delay_range
def start_simulation(self):
"""Start protocol simulation"""
if not self.serial:
return False
self.running = True
self.stats['start_time'] = datetime.now()
# Start simulation thread
self.sim_thread = threading.Thread(target=self._simulation_worker)
self.sim_thread.daemon = True
self.sim_thread.start()
print("🎭 Protocol simulation started")
return True
def _simulation_worker(self):
"""Background simulation worker"""
command_buffer = bytearray()
while self.running:
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
command_buffer.extend(data)
# Process complete commands
while self._process_command_buffer(command_buffer):
pass
except Exception as e:
print(f"Simulation error: {e}")
self.stats['errors'] += 1
time.sleep(0.001)
def _process_command_buffer(self, buffer: bytearray) -> bool:
"""Process command from buffer"""
if not buffer:
return False
# Try different command delimiters
delimiters = [b'\r\n', b'\n', b'\r', b'\0']
for delimiter in delimiters:
if delimiter in buffer:
command_data, remaining = buffer.split(delimiter, 1)
if command_data:
self._handle_command(command_data)
# Update buffer
buffer.clear()
buffer.extend(remaining)
return True
# Check for binary commands (fixed length or special markers)
if len(buffer) >= 8: # Minimum for Modbus RTU
# Try to process as binary command
if self._handle_binary_command(buffer):
return True
return False
def _handle_command(self, command_data: bytes):
"""Handle received command"""
try:
# Try to decode as text
command_str = command_data.decode('utf-8', errors='ignore')
except:
command_str = command_data.hex()
print(f"📥 Received: {command_str}")
# Find matching handler
handler_found = False
for pattern, handler in self.handlers.items():
if pattern.search(command_str) or pattern.search(command_data.decode('utf-8', errors='ignore')):
self._execute_handler(handler, command_data, command_str)
handler_found = True
break
if not handler_found:
# Send generic error response
self._send_response(b'ERROR\r\n', delay=0.1)
self.stats['commands_processed'] += 1
def _handle_binary_command(self, buffer: bytearray) -> bool:
"""Handle binary command"""
# Simplified Modbus RTU detection
if len(buffer) >= 8 and buffer[0] <= 0xF7 and buffer[1] <= 0x10:
# Assume Modbus RTU frame
command_data = bytes(buffer[:8]) # Typical frame size
self._handle_command(command_data)
buffer[:8] = b''
return True
return False
def _execute_handler(self, handler: Callable, command_data: bytes, command_str: str):
"""Execute protocol handler with delay"""
def delayed_handler():
# Calculate delay
delay_range = self.response_delays.get(handler, (0.01, 0.1))
delay = random.uniform(*delay_range)
time.sleep(delay)
try:
response = handler(command_data, command_str)
if response:
self._send_response(response)
except Exception as e:
print(f"Handler error: {e}")
self.stats['errors'] += 1
# Execute in separate thread for realistic timing
threading.Thread(target=delayed_handler, daemon=True).start()
def _send_response(self, response: bytes, delay: float = 0):
"""Send response with optional delay"""
if delay > 0:
time.sleep(delay)
try:
self.serial.write(response)
self.serial.flush()
# Decode for display
try:
response_str = response.decode('utf-8', errors='ignore')
except:
response_str = response.hex()
print(f"📤 Sent: {response_str.strip()}")
self.stats['responses_sent'] += 1
except Exception as e:
print(f"Send error: {e}")
self.stats['errors'] += 1
def _handle_at_command(self, command_data: bytes, command_str: str) -> bytes:
"""Handle AT commands"""
cmd = command_str.strip().upper()
if cmd == 'AT':
return b'OK\r\n'
elif cmd == 'ATI':
return b'Protocol Simulator v1.0\r\nOK\r\n'
elif cmd.startswith('AT+GMI'):
return b'Simulator Corp\r\nOK\r\n'
elif cmd.startswith('AT+CSQ'):
# Random signal quality
rssi = random.randint(5, 31)
return f'+CSQ: {rssi},99\r\nOK\r\n'.encode()
elif cmd.startswith('AT+COPS'):
return b'+COPS: 0,0,"Simulator Network"\r\nOK\r\n'
elif cmd.startswith('AT+'):
# Generic extended command response
return b'OK\r\n'
else:
return b'ERROR\r\n'
def _handle_json(self, command_data: bytes, command_str: str) -> bytes:
"""Handle JSON commands"""
try:
data = json.loads(command_str)
# Create response based on command
response = {
'status': 'success',
'timestamp': datetime.now().isoformat(),
'echo': data
}
# Add simulated data based on request
if 'sensors' in data:
response['sensors'] = {
'temperature': round(random.uniform(20, 30), 1),
'humidity': round(random.uniform(40, 80), 1),
'pressure': round(random.uniform(1000, 1020), 2)
}
if 'device_info' in data:
response['device_info'] = {
'name': 'Protocol Simulator',
'version': '1.0',
'uptime': int(time.time())
}
return json.dumps(response).encode() + b'\r\n'
except json.JSONDecodeError:
error_response = {
'status': 'error',
'message': 'Invalid JSON'
}
return json.dumps(error_response).encode() + b'\r\n'
def _handle_modbus_rtu(self, command_data: bytes, command_str: str) -> bytes:
"""Handle Modbus RTU commands (simplified)"""
if len(command_data) < 4:
return b'' # Invalid frame
slave_id = command_data[0]
function_code = command_data[1]
# Simulate responses for common function codes
if function_code == 0x03: # Read holding registers
if len(command_data) >= 6:
start_addr = int.from_bytes(command_data[2:4], 'big')
count = int.from_bytes(command_data[4:6], 'big')
# Create response
byte_count = count * 2
response = bytearray([slave_id, function_code, byte_count])
# Add random register values
for _ in range(count):
value = random.randint(0, 65535)
response.extend(value.to_bytes(2, 'big'))
# Add CRC
crc = self._calculate_modbus_crc(response)
response.extend(crc.to_bytes(2, 'little'))
return bytes(response)
elif function_code == 0x01: # Read coils
# Simplified coil response
response = bytearray([slave_id, function_code, 1, random.randint(0, 255)])
crc = self._calculate_modbus_crc(response)
response.extend(crc.to_bytes(2, 'little'))
return bytes(response)
return b'' # Unsupported function
def _calculate_modbus_crc(self, data: bytes) -> int:
"""Calculate Modbus RTU CRC16"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def _handle_nmea(self, command_data: bytes, command_str: str) -> bytes:
"""Handle NMEA sentences"""
sentence = command_str.strip()
if sentence.startswith('$GPGGA'):
# GPS fix sentence
time_str = datetime.now().strftime('%H%M%S.00')
lat = f"{random.uniform(40, 41):.6f}"
lon = f"{random.uniform(-74, -73):.6f}"
# Convert to DDMM.MMMM format
lat_deg = int(float(lat))
lat_min = (float(lat) - lat_deg) * 60
lat_nmea = f"{lat_deg:02d}{lat_min:07.4f}"
lon_deg = int(abs(float(lon)))
lon_min = (abs(float(lon)) - lon_deg) * 60
lon_nmea = f"{lon_deg:03d}{lon_min:07.4f}"
response = f"$GPGGA,{time_str},{lat_nmea},N,{lon_nmea},W,1,08,1.0,545.4,M,46.9,M,,"
# Calculate checksum
checksum = 0
for char in response[1:]: # Skip $
checksum ^= ord(char)
response += f"*{checksum:02X}\r\n"
return response.encode()
return b'' # Unsupported sentence
def add_custom_handler(self, name: str, pattern: str, response_func: Callable):
"""Add custom protocol handler"""
self.add_handler(pattern, response_func)
print(f"Added custom handler: {name}")
def get_statistics(self) -> Dict:
"""Get simulation statistics"""
stats = self.stats.copy()
if stats['start_time']:
duration = datetime.now() - stats['start_time']
stats['duration'] = duration.total_seconds()
if stats['duration'] > 0:
stats['commands_per_second'] = stats['commands_processed'] / stats['duration']
stats['responses_per_second'] = stats['responses_sent'] / stats['duration']
return stats
def stop_simulation(self):
"""Stop protocol simulation"""
self.running = False
if hasattr(self, 'sim_thread'):
self.sim_thread.join()
print("🎭 Protocol simulation stopped")
def close(self):
"""Close simulator"""
self.stop_simulation()
if self.serial:
self.serial.close()
# Example usage with custom handler
def custom_sensor_handler(command_data: bytes, command_str: str) -> bytes:
"""Custom sensor data handler"""
if 'TEMP' in command_str:
temp = round(random.uniform(20, 25), 1)
return f"TEMP:{temp}C\r\n".encode()
elif 'HUMID' in command_str:
humid = round(random.uniform(45, 65), 1)
return f"HUMID:{humid}%\r\n".encode()
return b'UNKNOWN\r\n'
# Create and run simulator
simulator = ProtocolSimulator('/dev/ttyUSB1', 115200) # Different port for simulation
# Add custom handler
simulator.add_custom_handler('Sensor Commands', r'(TEMP|HUMID)', custom_sensor_handler)
if simulator.connect():
simulator.start_simulation()
try:
print("Protocol simulator running... (Press Ctrl+C to stop)")
# Show statistics periodically
while True:
time.sleep(10)
stats = simulator.get_statistics()
print(f"📊 Stats: {stats['commands_processed']} commands, {stats['responses_sent']} responses, {stats['errors']} errors")
except KeyboardInterrupt:
pass
finally:
simulator.close()
Protocol Analyzer
Deep packet inspection and protocol detection
Development Monitor
Professional GUI for debugging and testing
Timing Analysis
Precise timing measurements and jitter analysis
Protocol Simulator
Simulate devices for testing and development
Professional debugging tools are essential for reliable PySerial applications. These utilities provide deep insight into communication patterns, timing characteristics, and protocol behavior for robust development.
How is this guide?