PySerial
PySerialDocs

Debugging & Development Tools

Complete PySerial debugging guide - analyze communication problems, monitor data flow, and optimize serial applications with professional debugging techniques

Master PySerial debugging with professional tools and techniques - analyze communication problems, monitor data flows, and optimize serial applications for reliable operation.

Serial Communication Analyzer

Build a comprehensive analyzer that captures, displays, and analyzes all serial communication for debugging complex protocols.

import serial
import threading
import time
from datetime import datetime
from typing import Dict, List, Optional, Callable
import re
import json

class SerialProtocolAnalyzer:
    def __init__(self, port: str, baudrate: int = 9600):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.analyzing = False
        
        # Data storage
        self.packets = []
        self.statistics = {
            'total_packets': 0,
            'tx_packets': 0,
            'rx_packets': 0,
            'total_bytes': 0,
            'tx_bytes': 0,
            'rx_bytes': 0,
            'start_time': None,
            'errors': 0
        }
        
        # Protocol detection
        self.protocol_patterns = {
            'AT_COMMANDS': re.compile(r'^AT([+]\w+)?.*$', re.IGNORECASE),
            'NMEA_GPS': re.compile(r'^\$GP\w{3},.*\*[0-9A-F]{2}$'),
            'MODBUS_ASCII': re.compile(r'^:[0-9A-F]+$'),
            'JSON': re.compile(r'^[\{\[].*[\}\]]$'),
            'CSV': re.compile(r'^[^,]+(?:,[^,]*)+$'),
            'HEX_DATA': re.compile(r'^[0-9A-F\s]+$', re.IGNORECASE),
            'BINARY': re.compile(r'[\x00-\x1F\x7F-\xFF]')
        }
        
        # Callbacks
        self.packet_callbacks = []
        self.error_callbacks = []
        
    def connect(self) -> bool:
        """Connect to serial port"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=0.1,  # Short timeout for analysis
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE
            )
            print(f"✅ Analyzer connected to {self.port} @ {self.baudrate} baud")
            return True
        except Exception as e:
            print(f"❌ Connection failed: {e}")
            return False
            
    def start_analysis(self):
        """Start protocol analysis"""
        if not self.serial:
            return False
            
        self.analyzing = True
        self.statistics['start_time'] = datetime.now()
        
        # Start analysis threads
        self.rx_thread = threading.Thread(target=self._rx_analyzer)
        self.tx_thread = threading.Thread(target=self._tx_analyzer)
        
        self.rx_thread.daemon = True
        self.tx_thread.daemon = True
        
        self.rx_thread.start()
        self.tx_thread.start()
        
        print("🔍 Protocol analysis started")
        return True
        
    def _rx_analyzer(self):
        """Analyze received data"""
        rx_buffer = bytearray()
        
        while self.analyzing:
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    rx_buffer.extend(data)
                    
                    # Process complete packets
                    while self._extract_packet(rx_buffer, 'RX'):
                        pass
                        
            except Exception as e:
                self._log_error(f"RX analysis error: {e}")
                
            time.sleep(0.001)
            
    def _tx_analyzer(self):
        """Analyze transmitted data (if monitoring TX line)"""
        # Note: This would require additional hardware or software to monitor TX
        # For now, we'll simulate by intercepting write operations
        pass
        
    def _extract_packet(self, buffer: bytearray, direction: str) -> bool:
        """Extract complete packet from buffer"""
        if not buffer:
            return False
            
        # Try different packet delimiters
        delimiters = [b'\r\n', b'\n', b'\r', b'\0']
        
        for delimiter in delimiters:
            if delimiter in buffer:
                packet_data, remaining = buffer.split(delimiter, 1)
                
                if packet_data:
                    self._process_packet(packet_data, direction, delimiter)
                    
                # Update buffer
                buffer.clear()
                buffer.extend(remaining)
                return True
                
        # Check for binary protocols with fixed lengths or special markers
        if len(buffer) > 0:
            # Look for potential binary protocol markers
            if buffer[0] in [0xAA, 0x55, 0xFF, 0x02, 0x03]:  # Common start bytes
                # Try to extract binary packet (simplified)
                if len(buffer) >= 4:  # Minimum binary packet
                    packet_length = self._guess_binary_length(buffer)
                    if len(buffer) >= packet_length:
                        packet_data = bytes(buffer[:packet_length])
                        self._process_packet(packet_data, direction, b'')
                        buffer[:packet_length] = b''
                        return True
                        
        return False
        
    def _guess_binary_length(self, buffer: bytearray) -> int:
        """Guess binary packet length"""
        if len(buffer) < 2:
            return 0
            
        # Common binary protocol patterns
        if buffer[0] == 0xAA and len(buffer) > 2:
            # Length byte at position 2
            if len(buffer) > 2:
                return 3 + buffer[2]
                
        elif buffer[0] in [0x01, 0x02, 0x03, 0x04]:  # Modbus function codes
            if len(buffer) >= 2:
                return 8  # Typical Modbus RTU frame
                
        # Default minimum length
        return min(len(buffer), 16)
        
    def _process_packet(self, packet_data: bytes, direction: str, delimiter: bytes):
        """Process detected packet"""
        timestamp = datetime.now()
        
        # Decode packet
        try:
            decoded_str = packet_data.decode('utf-8', errors='ignore')
        except:
            decoded_str = packet_data.hex()
            
        # Detect protocol
        protocol = self._detect_protocol(packet_data, decoded_str)
        
        # Create packet info
        packet_info = {
            'timestamp': timestamp,
            'direction': direction,
            'raw_data': packet_data,
            'decoded': decoded_str,
            'length': len(packet_data),
            'protocol': protocol,
            'delimiter': delimiter,
            'analysis': self._analyze_packet_content(packet_data, decoded_str, protocol)
        }
        
        # Store packet
        self.packets.append(packet_info)
        
        # Update statistics
        self.statistics['total_packets'] += 1
        self.statistics['total_bytes'] += len(packet_data)
        
        if direction == 'TX':
            self.statistics['tx_packets'] += 1
            self.statistics['tx_bytes'] += len(packet_data)
        else:
            self.statistics['rx_packets'] += 1
            self.statistics['rx_bytes'] += len(packet_data)
            
        # Trigger callbacks
        for callback in self.packet_callbacks:
            try:
                callback(packet_info)
            except Exception as e:
                self._log_error(f"Callback error: {e}")
                
        print(f"📦 {direction} [{timestamp.strftime('%H:%M:%S.%f')[:-3]}] "
              f"{protocol} ({len(packet_data)} bytes): {decoded_str[:50]}...")
              
    def _detect_protocol(self, raw_data: bytes, decoded_str: str) -> str:
        """Detect protocol type"""
        # Check for binary indicators first
        if any(b >= 0x80 or (b < 0x20 and b not in [0x09, 0x0A, 0x0D]) for b in raw_data):
            return 'BINARY'
            
        # Check text-based protocols
        for protocol_name, pattern in self.protocol_patterns.items():
            if protocol_name != 'BINARY' and pattern.search(decoded_str):
                return protocol_name
                
        # Check for common binary protocol headers
        if raw_data:
            first_byte = raw_data[0]
            if first_byte in [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x0F, 0x10]:
                return 'MODBUS_RTU'
            elif first_byte == 0xAA:
                return 'CUSTOM_BINARY'
            elif first_byte in [0x7E, 0x7D]:
                return 'PPP_HDLC'
                
        return 'UNKNOWN'
        
    def _analyze_packet_content(self, raw_data: bytes, decoded_str: str, protocol: str) -> Dict:
        """Analyze packet content based on protocol"""
        analysis = {'fields': {}, 'errors': [], 'warnings': []}
        
        if protocol == 'AT_COMMANDS':
            analysis.update(self._analyze_at_command(decoded_str))
        elif protocol == 'NMEA_GPS':
            analysis.update(self._analyze_nmea_sentence(decoded_str))
        elif protocol == 'MODBUS_RTU':
            analysis.update(self._analyze_modbus_rtu(raw_data))
        elif protocol == 'JSON':
            analysis.update(self._analyze_json(decoded_str))
        elif protocol == 'CSV':
            analysis.update(self._analyze_csv(decoded_str))
            
        return analysis
        
    def _analyze_at_command(self, data: str) -> Dict:
        """Analyze AT command"""
        analysis = {'fields': {}, 'errors': [], 'warnings': []}
        
        if data.upper().startswith('AT'):
            if '+' in data:
                # Extended AT command
                parts = data.split('+', 1)
                if len(parts) > 1:
                    cmd_part = parts[1]
                    if '=' in cmd_part:
                        cmd, params = cmd_part.split('=', 1)
                        analysis['fields']['command'] = cmd
                        analysis['fields']['parameters'] = params
                    else:
                        analysis['fields']['command'] = cmd_part
            else:
                analysis['fields']['command'] = 'BASIC_AT'
                
        elif data in ['OK', 'ERROR', 'CONNECT', 'NO CARRIER']:
            analysis['fields']['response_type'] = data
            
        return analysis
        
    def _analyze_nmea_sentence(self, data: str) -> Dict:
        """Analyze NMEA sentence"""
        analysis = {'fields': {}, 'errors': [], 'warnings': []}
        
        try:
            # Validate checksum
            if '*' in data:
                sentence, checksum = data.split('*')
                calc_checksum = 0
                for char in sentence[1:]:  # Skip $
                    calc_checksum ^= ord(char)
                    
                expected_checksum = f"{calc_checksum:02X}"
                if checksum.upper() != expected_checksum:
                    analysis['errors'].append(f"Checksum mismatch: got {checksum}, expected {expected_checksum}")
                else:
                    analysis['fields']['checksum_valid'] = True
                    
            # Parse sentence type
            if data.startswith('$'):
                parts = data[1:].split('*')[0].split(',')
                if parts:
                    analysis['fields']['sentence_type'] = parts[0]
                    analysis['fields']['field_count'] = len(parts) - 1
                    
                    # Specific NMEA sentence analysis
                    if parts[0] == 'GPGGA' and len(parts) >= 15:
                        analysis['fields']['fix_quality'] = parts[6]
                        analysis['fields']['satellites'] = parts[7]
                        analysis['fields']['altitude'] = parts[9]
                        
        except Exception as e:
            analysis['errors'].append(f"NMEA parsing error: {e}")
            
        return analysis
        
    def _analyze_modbus_rtu(self, data: bytes) -> Dict:
        """Analyze Modbus RTU frame"""
        analysis = {'fields': {}, 'errors': [], 'warnings': []}
        
        if len(data) < 4:
            analysis['errors'].append("Frame too short for Modbus RTU")
            return analysis
            
        slave_id = data[0]
        function_code = data[1]
        
        analysis['fields']['slave_id'] = slave_id
        analysis['fields']['function_code'] = function_code
        
        # Validate CRC if frame is complete
        if len(data) >= 4:
            frame_data = data[:-2]
            received_crc = int.from_bytes(data[-2:], byteorder='little')
            calculated_crc = self._calculate_modbus_crc(frame_data)
            
            if received_crc != calculated_crc:
                analysis['errors'].append(f"CRC mismatch: got {received_crc:04X}, expected {calculated_crc:04X}")
            else:
                analysis['fields']['crc_valid'] = True
                
        # Analyze function code
        function_names = {
            0x01: "Read Coils",
            0x02: "Read Discrete Inputs", 
            0x03: "Read Holding Registers",
            0x04: "Read Input Registers",
            0x05: "Write Single Coil",
            0x06: "Write Single Register",
            0x0F: "Write Multiple Coils",
            0x10: "Write Multiple Registers"
        }
        
        if function_code in function_names:
            analysis['fields']['function_name'] = function_names[function_code]
        elif function_code & 0x80:
            analysis['fields']['function_name'] = "Exception Response"
            if len(data) > 2:
                analysis['fields']['exception_code'] = data[2]
                
        return analysis
        
    def _calculate_modbus_crc(self, data: bytes) -> int:
        """Calculate Modbus RTU CRC16"""
        crc = 0xFFFF
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                else:
                    crc >>= 1
        return crc
        
    def _analyze_json(self, data: str) -> Dict:
        """Analyze JSON data"""
        analysis = {'fields': {}, 'errors': [], 'warnings': []}
        
        try:
            parsed = json.loads(data)
            analysis['fields']['json_valid'] = True
            analysis['fields']['key_count'] = len(parsed) if isinstance(parsed, dict) else 0
            analysis['fields']['data_type'] = type(parsed).__name__
        except json.JSONDecodeError as e:
            analysis['errors'].append(f"JSON parsing error: {e}")
            
        return analysis
        
    def _analyze_csv(self, data: str) -> Dict:
        """Analyze CSV data"""
        analysis = {'fields': {}, 'errors': [], 'warnings': []}
        
        fields = data.split(',')
        analysis['fields']['field_count'] = len(fields)
        
        # Check for numeric fields
        numeric_count = 0
        for field in fields:
            try:
                float(field.strip())
                numeric_count += 1
            except ValueError:
                pass
                
        analysis['fields']['numeric_fields'] = numeric_count
        
        return analysis
        
    def _log_error(self, error_msg: str):
        """Log analysis error"""
        self.statistics['errors'] += 1
        
        error_info = {
            'timestamp': datetime.now(),
            'message': error_msg
        }
        
        for callback in self.error_callbacks:
            try:
                callback(error_info)
            except:
                pass
                
        print(f"❌ Analysis Error: {error_msg}")
        
    def add_packet_callback(self, callback: Callable):
        """Add packet detection callback"""
        self.packet_callbacks.append(callback)
        
    def add_error_callback(self, callback: Callable):
        """Add error callback"""
        self.error_callbacks.append(callback)
        
    def get_protocol_summary(self) -> Dict:
        """Get protocol distribution summary"""
        protocol_counts = {}
        
        for packet in self.packets:
            protocol = packet['protocol']
            protocol_counts[protocol] = protocol_counts.get(protocol, 0) + 1
            
        return protocol_counts
        
    def get_statistics(self) -> Dict:
        """Get analysis statistics"""
        stats = self.statistics.copy()
        
        if stats['start_time']:
            duration = datetime.now() - stats['start_time']
            stats['duration'] = duration.total_seconds()
            
            if stats['duration'] > 0:
                stats['packets_per_second'] = stats['total_packets'] / stats['duration']
                stats['bytes_per_second'] = stats['total_bytes'] / stats['duration']
                
        stats['protocol_distribution'] = self.get_protocol_summary()
        
        return stats
        
    def export_analysis(self, filename: str):
        """Export analysis results"""
        export_data = {
            'analysis_info': {
                'port': self.port,
                'baudrate': self.baudrate,
                'start_time': self.statistics['start_time'].isoformat() if self.statistics['start_time'] else None,
                'export_time': datetime.now().isoformat()
            },
            'statistics': self.get_statistics(),
            'packets': []
        }
        
        # Export packet data
        for packet in self.packets:
            packet_export = {
                'timestamp': packet['timestamp'].isoformat(),
                'direction': packet['direction'],
                'length': packet['length'],
                'protocol': packet['protocol'],
                'decoded': packet['decoded'],
                'analysis': packet['analysis']
            }
            export_data['packets'].append(packet_export)
            
        with open(filename, 'w') as f:
            json.dump(export_data, f, indent=2)
            
        print(f"📄 Analysis exported to {filename}")
        
    def stop_analysis(self):
        """Stop protocol analysis"""
        self.analyzing = False
        
        if hasattr(self, 'rx_thread'):
            self.rx_thread.join()
        if hasattr(self, 'tx_thread'):
            self.tx_thread.join()
            
        print("⏹️ Protocol analysis stopped")
        
    def close(self):
        """Close analyzer"""
        self.stop_analysis()
        if self.serial:
            self.serial.close()

# Example usage with callbacks
def packet_handler(packet_info):
    """Handle detected packets"""
    if packet_info['protocol'] == 'AT_COMMANDS':
        print(f"🔔 AT Command detected: {packet_info['decoded']}")
    elif len(packet_info['analysis']['errors']) > 0:
        print(f"⚠️ Packet errors: {packet_info['analysis']['errors']}")

def error_handler(error_info):
    """Handle analysis errors"""
    print(f"🚨 Analysis error: {error_info['message']}")

# Create and use analyzer
analyzer = SerialProtocolAnalyzer('/dev/ttyUSB0', 115200)
analyzer.add_packet_callback(packet_handler)
analyzer.add_error_callback(error_handler)

if analyzer.connect():
    analyzer.start_analysis()
    
    try:
        time.sleep(60)  # Analyze for 1 minute
        
        # Show statistics
        stats = analyzer.get_statistics()
        print(f"\n📊 Analysis Statistics:")
        print(f"   Total packets: {stats['total_packets']}")
        print(f"   Protocol distribution: {stats['protocol_distribution']}")
        print(f"   Error count: {stats['errors']}")
        
        # Export results
        analyzer.export_analysis('protocol_analysis.json')
        
    except KeyboardInterrupt:
        pass
    finally:
        analyzer.close()
import serial
import threading
import time
from datetime import datetime
from typing import List, Optional
import curses
import queue

class SerialHexViewer:
    def __init__(self, port: str, baudrate: int = 9600):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.running = False
        
        # Data storage
        self.data_queue = queue.Queue()
        self.display_buffer = []
        self.max_buffer_size = 1000
        
        # Display settings
        self.bytes_per_line = 16
        self.show_ascii = True
        self.show_timestamps = True
        self.highlight_changes = True
        
    def connect(self) -> bool:
        """Connect to serial port"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=0.1
            )
            return True
        except Exception as e:
            print(f"Connection failed: {e}")
            return False
            
    def start_capture(self):
        """Start data capture"""
        if not self.serial:
            return False
            
        self.running = True
        self.capture_thread = threading.Thread(target=self._capture_worker)
        self.capture_thread.daemon = True
        self.capture_thread.start()
        
        return True
        
    def _capture_worker(self):
        """Background data capture"""
        while self.running:
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    timestamp = datetime.now()
                    
                    # Add to queue
                    self.data_queue.put({
                        'timestamp': timestamp,
                        'data': data,
                        'direction': 'RX'
                    })
                    
            except Exception as e:
                print(f"Capture error: {e}")
                
            time.sleep(0.001)
            
    def format_hex_line(self, data: bytes, offset: int = 0) -> str:
        """Format bytes as hex line with ASCII"""
        hex_parts = []
        ascii_parts = []
        
        for i in range(self.bytes_per_line):
            if i < len(data):
                byte_val = data[i]
                hex_parts.append(f"{byte_val:02X}")
                
                # ASCII representation
                if 32 <= byte_val <= 126:
                    ascii_parts.append(chr(byte_val))
                else:
                    ascii_parts.append('.')
            else:
                hex_parts.append('  ')
                ascii_parts.append(' ')
                
        hex_str = ' '.join(hex_parts)
        ascii_str = ''.join(ascii_parts)
        
        # Add separators for readability
        hex_formatted = f"{hex_str[:23]} {hex_str[24:]}"
        
        line = f"{offset:08X}: {hex_formatted}"
        
        if self.show_ascii:
            line += f" |{ascii_str}|"
            
        return line
        
    def run_console_viewer(self):
        """Run console-based hex viewer"""
        if not self.serial:
            print("Not connected to serial port")
            return
            
        self.start_capture()
        
        try:
            while self.running:
                # Process queued data
                while not self.data_queue.empty():
                    try:
                        item = self.data_queue.get_nowait()
                        self._add_to_display_buffer(item)
                    except queue.Empty:
                        break
                        
                # Display recent data
                self._display_console()
                time.sleep(0.1)
                
        except KeyboardInterrupt:
            pass
        finally:
            self.stop_capture()
            
    def run_curses_viewer(self):
        """Run curses-based interactive hex viewer"""
        if not self.serial:
            print("Not connected to serial port")
            return
            
        self.start_capture()
        curses.wrapper(self._curses_main)
        self.stop_capture()
        
    def _curses_main(self, stdscr):
        """Main curses interface"""
        # Initialize curses
        curses.curs_set(0)  # Hide cursor
        stdscr.nodelay(1)   # Non-blocking input
        stdscr.timeout(100) # 100ms timeout
        
        # Color pairs
        if curses.has_colors():
            curses.start_colors()
            curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)  # RX data
            curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK)    # TX data
            curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) # Timestamps
            curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK)   # Headers
            
        display_offset = 0
        paused = False
        
        while self.running:
            # Handle keyboard input
            key = stdscr.getch()
            
            if key == ord('q') or key == ord('Q'):
                break
            elif key == ord(' '):
                paused = not paused
            elif key == ord('c') or key == ord('C'):
                self.display_buffer.clear()
            elif key == curses.KEY_UP:
                display_offset = max(0, display_offset - 1)
            elif key == curses.KEY_DOWN:
                display_offset = min(len(self.display_buffer) - 1, display_offset + 1)
            elif key == curses.KEY_HOME:
                display_offset = 0
            elif key == curses.KEY_END:
                display_offset = max(0, len(self.display_buffer) - 1)
                
            # Process new data if not paused
            if not paused:
                while not self.data_queue.empty():
                    try:
                        item = self.data_queue.get_nowait()
                        self._add_to_display_buffer(item)
                    except queue.Empty:
                        break
                        
            # Clear screen and draw
            stdscr.clear()
            
            # Draw header
            height, width = stdscr.getmaxyx()
            header = f"Serial Hex Viewer - {self.port} @ {self.baudrate} baud"
            stdscr.addstr(0, 0, header, curses.color_pair(4) | curses.A_BOLD)
            
            status = f"{'PAUSED' if paused else 'RUNNING'} | Lines: {len(self.display_buffer)} | "
            status += "Commands: Q=Quit, SPACE=Pause, C=Clear, Arrow Keys=Scroll"
            stdscr.addstr(1, 0, status[:width-1])
            
            # Draw separator
            stdscr.addstr(2, 0, "-" * (width - 1))
            
            # Draw hex data
            start_row = 3
            visible_lines = height - start_row - 1
            
            for i in range(visible_lines):
                line_idx = display_offset + i
                if line_idx >= len(self.display_buffer):
                    break
                    
                display_line = self.display_buffer[line_idx]
                
                # Choose color based on direction
                color_pair = curses.color_pair(1) if display_line['direction'] == 'RX' else curses.color_pair(2)
                
                # Format line
                if self.show_timestamps:
                    timestamp_str = display_line['timestamp'].strftime("%H:%M:%S.%f")[:-3]
                    line_text = f"[{timestamp_str}] {display_line['direction']}: {display_line['formatted']}"
                else:
                    line_text = f"{display_line['direction']}: {display_line['formatted']}"
                    
                try:
                    stdscr.addstr(start_row + i, 0, line_text[:width-1], color_pair)
                except curses.error:
                    pass  # Ignore errors from drawing at screen edge
                    
            # Draw footer
            footer = f"Offset: {display_offset}/{len(self.display_buffer)-1} | "
            footer += f"Press 'q' to quit, SPACE to pause/resume"
            try:
                stdscr.addstr(height - 1, 0, footer[:width-1], curses.color_pair(3))
            except curses.error:
                pass
                
            # Refresh screen
            stdscr.refresh()
            
        self.running = False
        
    def _add_to_display_buffer(self, item):
        """Add item to display buffer"""
        data = item['data']
        timestamp = item['timestamp']
        direction = item['direction']
        
        # Split data into lines
        offset = 0
        while offset < len(data):
            line_data = data[offset:offset + self.bytes_per_line]
            formatted_line = self.format_hex_line(line_data, offset)
            
            self.display_buffer.append({
                'timestamp': timestamp,
                'direction': direction,
                'formatted': formatted_line,
                'raw_data': line_data
            })
            
            offset += self.bytes_per_line
            
        # Limit buffer size
        if len(self.display_buffer) > self.max_buffer_size:
            self.display_buffer = self.display_buffer[-self.max_buffer_size:]
            
    def _display_console(self):
        """Display in console (simple version)"""
        # Clear screen
        print('\033[2J\033[H', end='')
        
        # Show recent entries
        recent_entries = self.display_buffer[-20:]  # Last 20 lines
        
        print(f"Serial Hex Viewer - {self.port} @ {self.baudrate} baud")
        print("=" * 60)
        
        for entry in recent_entries:
            timestamp_str = entry['timestamp'].strftime("%H:%M:%S.%f")[:-3]
            direction = entry['direction']
            formatted = entry['formatted']
            
            print(f"[{timestamp_str}] {direction}: {formatted}")
            
        print(f"\nTotal lines: {len(self.display_buffer)} (Press Ctrl+C to stop)")
        
    def send_data(self, data: bytes):
        """Send data and add to display"""
        if self.serial:
            self.serial.write(data)
            
            # Add to display as TX
            self.data_queue.put({
                'timestamp': datetime.now(),
                'data': data,
                'direction': 'TX'
            })
            
    def save_capture(self, filename: str):
        """Save captured data to file"""
        with open(filename, 'w') as f:
            f.write(f"Serial Hex Capture - {self.port} @ {self.baudrate} baud\n")
            f.write(f"Captured: {datetime.now()}\n")
            f.write("=" * 70 + "\n\n")
            
            for entry in self.display_buffer:
                timestamp_str = entry['timestamp'].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                direction = entry['direction']
                formatted = entry['formatted']
                
                f.write(f"[{timestamp_str}] {direction}: {formatted}\n")
                
        print(f"Capture saved to {filename}")
        
    def stop_capture(self):
        """Stop data capture"""
        self.running = False
        
        if hasattr(self, 'capture_thread'):
            self.capture_thread.join()
            
    def close(self):
        """Close viewer"""
        self.stop_capture()
        if self.serial:
            self.serial.close()

# Example usage
hex_viewer = SerialHexViewer('/dev/ttyUSB0', 115200)

if hex_viewer.connect():
    print("Starting hex viewer...")
    print("Choose display mode:")
    print("1. Console viewer")
    print("2. Interactive curses viewer")
    
    choice = input("Enter choice (1 or 2): ")
    
    try:
        if choice == '2':
            hex_viewer.run_curses_viewer()
        else:
            hex_viewer.run_console_viewer()
            
        # Save capture
        hex_viewer.save_capture('serial_capture.txt')
        
    finally:
        hex_viewer.close()
import serial
import threading
import time
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
import statistics
import matplotlib.pyplot as plt
import numpy as np

class SerialTimingAnalyzer:
    def __init__(self, port: str, baudrate: int = 9600):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.analyzing = False
        
        # Timing data storage
        self.byte_timestamps = []
        self.packet_timestamps = []
        self.inter_byte_times = []
        self.inter_packet_times = []
        
        # Analysis results
        self.timing_stats = {}
        
        # Configuration
        self.packet_timeout = 0.01  # 10ms packet timeout
        self.max_data_points = 10000
        
    def connect(self) -> bool:
        """Connect to serial port"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=0.001  # Very short timeout for precise timing
            )
            return True
        except Exception as e:
            print(f"Connection failed: {e}")
            return False
            
    def start_analysis(self):
        """Start timing analysis"""
        if not self.serial:
            return False
            
        self.analyzing = True
        self.analysis_thread = threading.Thread(target=self._timing_worker)
        self.analysis_thread.daemon = True
        self.analysis_thread.start()
        
        print("⏱️ Timing analysis started")
        return True
        
    def _timing_worker(self):
        """Background timing analysis"""
        current_packet_start = None
        last_byte_time = None
        packet_bytes = []
        
        while self.analyzing:
            try:
                if self.serial.in_waiting:
                    # High-precision timestamp
                    byte_time = time.perf_counter()
                    byte_data = self.serial.read(1)
                    
                    if byte_data:
                        # Record byte timestamp
                        self.byte_timestamps.append(byte_time)
                        packet_bytes.append(byte_data[0])
                        
                        # Calculate inter-byte time
                        if last_byte_time is not None:
                            inter_byte_time = byte_time - last_byte_time
                            self.inter_byte_times.append(inter_byte_time)
                            
                        last_byte_time = byte_time
                        
                        # Check for packet start
                        if current_packet_start is None:
                            current_packet_start = byte_time
                            
                else:
                    # Check for packet end (timeout)
                    if (current_packet_start is not None and 
                        last_byte_time is not None and
                        time.perf_counter() - last_byte_time > self.packet_timeout):
                        
                        # Record packet timing
                        packet_duration = last_byte_time - current_packet_start
                        self.packet_timestamps.append({
                            'start_time': current_packet_start,
                            'end_time': last_byte_time,
                            'duration': packet_duration,
                            'byte_count': len(packet_bytes),
                            'bytes': packet_bytes.copy()
                        })
                        
                        # Calculate inter-packet time
                        if len(self.packet_timestamps) > 1:
                            prev_packet = self.packet_timestamps[-2]
                            inter_packet_time = current_packet_start - prev_packet['end_time']
                            self.inter_packet_times.append(inter_packet_time)
                            
                        # Reset for next packet
                        current_packet_start = None
                        packet_bytes.clear()
                        
                # Limit data points to prevent memory issues
                if len(self.byte_timestamps) > self.max_data_points:
                    self.byte_timestamps = self.byte_timestamps[-self.max_data_points//2:]
                    self.inter_byte_times = self.inter_byte_times[-self.max_data_points//2:]
                    
            except Exception as e:
                print(f"Timing analysis error: {e}")
                
            time.sleep(0.0001)  # Minimal sleep for high precision
            
    def calculate_statistics(self) -> Dict:
        """Calculate timing statistics"""
        stats = {}
        
        # Inter-byte timing statistics
        if self.inter_byte_times:
            stats['inter_byte'] = {
                'count': len(self.inter_byte_times),
                'mean': statistics.mean(self.inter_byte_times) * 1000,  # Convert to ms
                'median': statistics.median(self.inter_byte_times) * 1000,
                'min': min(self.inter_byte_times) * 1000,
                'max': max(self.inter_byte_times) * 1000,
                'std_dev': statistics.stdev(self.inter_byte_times) * 1000 if len(self.inter_byte_times) > 1 else 0
            }
            
        # Inter-packet timing statistics
        if self.inter_packet_times:
            stats['inter_packet'] = {
                'count': len(self.inter_packet_times),
                'mean': statistics.mean(self.inter_packet_times) * 1000,
                'median': statistics.median(self.inter_packet_times) * 1000,
                'min': min(self.inter_packet_times) * 1000,
                'max': max(self.inter_packet_times) * 1000,
                'std_dev': statistics.stdev(self.inter_packet_times) * 1000 if len(self.inter_packet_times) > 1 else 0
            }
            
        # Packet statistics
        if self.packet_timestamps:
            packet_durations = [p['duration'] for p in self.packet_timestamps]
            packet_sizes = [p['byte_count'] for p in self.packet_timestamps]
            
            stats['packets'] = {
                'count': len(self.packet_timestamps),
                'duration_mean': statistics.mean(packet_durations) * 1000,
                'duration_min': min(packet_durations) * 1000,
                'duration_max': max(packet_durations) * 1000,
                'size_mean': statistics.mean(packet_sizes),
                'size_min': min(packet_sizes),
                'size_max': max(packet_sizes)
            }
            
        # Calculate theoretical vs actual timing
        theoretical_bit_time = 1.0 / self.baudrate
        theoretical_byte_time = theoretical_bit_time * 10  # 8 data + 1 start + 1 stop
        
        stats['theoretical'] = {
            'bit_time_us': theoretical_bit_time * 1000000,
            'byte_time_ms': theoretical_byte_time * 1000,
            'bytes_per_second': 1.0 / theoretical_byte_time
        }
        
        # Compare with actual measurements
        if self.inter_byte_times:
            actual_byte_time = statistics.mean(self.inter_byte_times)
            stats['comparison'] = {
                'theoretical_byte_time_ms': theoretical_byte_time * 1000,
                'actual_byte_time_ms': actual_byte_time * 1000,
                'timing_accuracy': (1 - abs(actual_byte_time - theoretical_byte_time) / theoretical_byte_time) * 100
            }
            
        self.timing_stats = stats
        return stats
        
    def detect_timing_anomalies(self, threshold_multiplier: float = 3.0) -> List[Dict]:
        """Detect timing anomalies"""
        anomalies = []
        
        if not self.inter_byte_times:
            return anomalies
            
        # Calculate threshold based on statistics
        mean_time = statistics.mean(self.inter_byte_times)
        std_dev = statistics.stdev(self.inter_byte_times) if len(self.inter_byte_times) > 1 else 0
        threshold = mean_time + (threshold_multiplier * std_dev)
        
        # Find anomalies
        for i, inter_time in enumerate(self.inter_byte_times):
            if inter_time > threshold:
                anomalies.append({
                    'index': i,
                    'time': inter_time * 1000,  # Convert to ms
                    'threshold': threshold * 1000,
                    'ratio': inter_time / mean_time,
                    'timestamp': self.byte_timestamps[i + 1] if i + 1 < len(self.byte_timestamps) else None
                })
                
        return anomalies
        
    def analyze_jitter(self) -> Dict:
        """Analyze timing jitter"""
        if not self.inter_byte_times:
            return {}
            
        times_ms = [t * 1000 for t in self.inter_byte_times]
        
        # Calculate jitter metrics
        mean_time = statistics.mean(times_ms)
        deviations = [abs(t - mean_time) for t in times_ms]
        
        jitter_analysis = {
            'mean_deviation': statistics.mean(deviations),
            'max_deviation': max(deviations),
            'rms_jitter': np.sqrt(np.mean([d**2 for d in deviations])),
            'peak_to_peak_jitter': max(times_ms) - min(times_ms),
            'coefficient_of_variation': (statistics.stdev(times_ms) / mean_time) * 100
        }
        
        return jitter_analysis
        
    def create_timing_plots(self, save_plots: bool = True):
        """Create timing analysis plots"""
        if not self.inter_byte_times and not self.inter_packet_times:
            print("No timing data to plot")
            return
            
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle(f'Serial Timing Analysis - {self.port} @ {self.baudrate} baud')
        
        # Inter-byte timing histogram
        if self.inter_byte_times:
            times_ms = [t * 1000 for t in self.inter_byte_times]
            axes[0, 0].hist(times_ms, bins=50, alpha=0.7, edgecolor='black')
            axes[0, 0].set_title('Inter-byte Timing Distribution')
            axes[0, 0].set_xlabel('Time (ms)')
            axes[0, 0].set_ylabel('Count')
            axes[0, 0].grid(True, alpha=0.3)
            
            # Add theoretical line
            theoretical_ms = (1.0 / self.baudrate) * 10 * 1000
            axes[0, 0].axvline(theoretical_ms, color='red', linestyle='--', 
                             label=f'Theoretical: {theoretical_ms:.3f}ms')
            axes[0, 0].legend()
            
        # Inter-byte timing over time
        if self.inter_byte_times:
            times_ms = [t * 1000 for t in self.inter_byte_times[-1000:]]  # Last 1000 points
            axes[0, 1].plot(times_ms, alpha=0.7)
            axes[0, 1].set_title('Inter-byte Timing Over Time')
            axes[0, 1].set_xlabel('Sample Number')
            axes[0, 1].set_ylabel('Time (ms)')
            axes[0, 1].grid(True, alpha=0.3)
            
        # Packet timing
        if self.packet_timestamps:
            durations_ms = [p['duration'] * 1000 for p in self.packet_timestamps]
            sizes = [p['byte_count'] for p in self.packet_timestamps]
            
            axes[1, 0].scatter(sizes, durations_ms, alpha=0.6)
            axes[1, 0].set_title('Packet Duration vs Size')
            axes[1, 0].set_xlabel('Packet Size (bytes)')
            axes[1, 0].set_ylabel('Duration (ms)')
            axes[1, 0].grid(True, alpha=0.3)
            
        # Inter-packet timing
        if self.inter_packet_times:
            times_ms = [t * 1000 for t in self.inter_packet_times]
            axes[1, 1].hist(times_ms, bins=50, alpha=0.7, edgecolor='black')
            axes[1, 1].set_title('Inter-packet Timing Distribution')
            axes[1, 1].set_xlabel('Time (ms)')
            axes[1, 1].set_ylabel('Count')
            axes[1, 1].grid(True, alpha=0.3)
            
        plt.tight_layout()
        
        if save_plots:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f'timing_analysis_{timestamp}.png'
            plt.savefig(filename, dpi=300, bbox_inches='tight')
            print(f"📊 Timing plots saved to {filename}")
            
        plt.show()
        
    def generate_timing_report(self) -> str:
        """Generate comprehensive timing report"""
        stats = self.calculate_statistics()
        anomalies = self.detect_timing_anomalies()
        jitter = self.analyze_jitter()
        
        report = []
        report.append("Serial Timing Analysis Report")
        report.append("=" * 50)
        report.append(f"Port: {self.port}")
        report.append(f"Baudrate: {self.baudrate}")
        report.append(f"Analysis Time: {datetime.now()}")
        report.append("")
        
        # Theoretical timing
        if 'theoretical' in stats:
            report.append("Theoretical Timing:")
            report.append(f"  Bit time: {stats['theoretical']['bit_time_us']:.2f} μs")
            report.append(f"  Byte time: {stats['theoretical']['byte_time_ms']:.3f} ms")
            report.append(f"  Max bytes/sec: {stats['theoretical']['bytes_per_second']:.0f}")
            report.append("")
            
        # Inter-byte timing
        if 'inter_byte' in stats:
            ib = stats['inter_byte']
            report.append("Inter-byte Timing:")
            report.append(f"  Samples: {ib['count']}")
            report.append(f"  Mean: {ib['mean']:.3f} ms")
            report.append(f"  Median: {ib['median']:.3f} ms")
            report.append(f"  Min: {ib['min']:.3f} ms")
            report.append(f"  Max: {ib['max']:.3f} ms")
            report.append(f"  Std Dev: {ib['std_dev']:.3f} ms")
            report.append("")
            
        # Timing accuracy
        if 'comparison' in stats:
            comp = stats['comparison']
            report.append("Timing Accuracy:")
            report.append(f"  Theoretical: {comp['theoretical_byte_time_ms']:.3f} ms")
            report.append(f"  Actual: {comp['actual_byte_time_ms']:.3f} ms")
            report.append(f"  Accuracy: {comp['timing_accuracy']:.1f}%")
            report.append("")
            
        # Jitter analysis
        if jitter:
            report.append("Jitter Analysis:")
            report.append(f"  Mean deviation: {jitter['mean_deviation']:.3f} ms")
            report.append(f"  Max deviation: {jitter['max_deviation']:.3f} ms")
            report.append(f"  RMS jitter: {jitter['rms_jitter']:.3f} ms")
            report.append(f"  Peak-to-peak: {jitter['peak_to_peak_jitter']:.3f} ms")
            report.append(f"  Coefficient of variation: {jitter['coefficient_of_variation']:.2f}%")
            report.append("")
            
        # Anomalies
        report.append(f"Timing Anomalies: {len(anomalies)}")
        if anomalies:
            report.append("  (Showing first 10)")
            for i, anomaly in enumerate(anomalies[:10]):
                report.append(f"  {i+1}. {anomaly['time']:.3f} ms (ratio: {anomaly['ratio']:.1f}x)")
            report.append("")
            
        # Packet statistics
        if 'packets' in stats:
            pkt = stats['packets']
            report.append("Packet Statistics:")
            report.append(f"  Total packets: {pkt['count']}")
            report.append(f"  Mean duration: {pkt['duration_mean']:.3f} ms")
            report.append(f"  Mean size: {pkt['size_mean']:.1f} bytes")
            report.append("")
            
        return "\n".join(report)
        
    def stop_analysis(self):
        """Stop timing analysis"""
        self.analyzing = False
        
        if hasattr(self, 'analysis_thread'):
            self.analysis_thread.join()
            
        print("⏱️ Timing analysis stopped")
        
    def close(self):
        """Close analyzer"""
        self.stop_analysis()
        if self.serial:
            self.serial.close()

# Example usage
timing_analyzer = SerialTimingAnalyzer('/dev/ttyUSB0', 115200)

if timing_analyzer.connect():
    timing_analyzer.start_analysis()
    
    try:
        print("Analyzing timing for 30 seconds...")
        time.sleep(30)
        
        # Generate report
        report = timing_analyzer.generate_timing_report()
        print(report)
        
        # Create plots
        timing_analyzer.create_timing_plots(save_plots=True)
        
        # Save report
        with open('timing_report.txt', 'w') as f:
            f.write(report)
        print("📄 Report saved to timing_report.txt")
        
    except KeyboardInterrupt:
        pass
    finally:
        timing_analyzer.close()

Development Utilities

Serial Port Monitor

import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
import json
import queue
import re

class SerialPortMonitor:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("PySerial Development Monitor")
        self.root.geometry("1200x800")
        
        # Serial connection
        self.serial = None
        self.connected = tk.BooleanVar(value=False)
        self.monitoring = tk.BooleanVar(value=False)
        
        # Data handling
        self.data_queue = queue.Queue()
        self.auto_scroll = tk.BooleanVar(value=True)
        self.display_hex = tk.BooleanVar(value=False)
        self.display_timestamps = tk.BooleanVar(value=True)
        self.log_to_file = tk.BooleanVar(value=False)
        self.log_file = None
        
        # Statistics
        self.stats = {
            'bytes_received': 0,
            'bytes_sent': 0,
            'lines_received': 0,
            'start_time': None
        }
        
        # Create interface
        self._create_interface()
        
        # Start data processor
        self.data_processor = threading.Thread(target=self._process_data_queue)
        self.data_processor.daemon = True
        self.data_processor.start()
        
        # Update GUI periodically
        self._update_gui()
        
    def _create_interface(self):
        """Create the GUI interface"""
        # Main notebook
        notebook = ttk.Notebook(self.root)
        notebook.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Monitor tab
        monitor_frame = ttk.Frame(notebook)
        notebook.add(monitor_frame, text="Monitor")
        self._create_monitor_tab(monitor_frame)
        
        # Send tab
        send_frame = ttk.Frame(notebook)
        notebook.add(send_frame, text="Send Data")
        self._create_send_tab(send_frame)
        
        # Statistics tab
        stats_frame = ttk.Frame(notebook)
        notebook.add(stats_frame, text="Statistics")
        self._create_statistics_tab(stats_frame)
        
        # Settings tab
        settings_frame = ttk.Frame(notebook)
        notebook.add(settings_frame, text="Settings")
        self._create_settings_tab(settings_frame)
        
    def _create_monitor_tab(self, parent):
        """Create monitor tab"""
        # Connection frame
        conn_frame = ttk.LabelFrame(parent, text="Connection")
        conn_frame.pack(fill='x', padx=5, pady=5)
        
        # Port selection
        ttk.Label(conn_frame, text="Port:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
        self.port_var = tk.StringVar()
        self.port_combo = ttk.Combobox(conn_frame, textvariable=self.port_var, width=15)
        self.port_combo.grid(row=0, column=1, padx=5, pady=5)
        
        ttk.Button(conn_frame, text="Refresh", command=self._refresh_ports).grid(row=0, column=2, padx=5)
        
        # Baudrate selection
        ttk.Label(conn_frame, text="Baudrate:").grid(row=0, column=3, padx=5, pady=5, sticky='w')
        self.baudrate_var = tk.StringVar(value="9600")
        baudrate_combo = ttk.Combobox(conn_frame, textvariable=self.baudrate_var, 
                                     values=["1200", "2400", "4800", "9600", "19200", "38400", "57600", "115200"], 
                                     width=10)
        baudrate_combo.grid(row=0, column=4, padx=5, pady=5)
        
        # Connection buttons
        ttk.Button(conn_frame, text="Connect", command=self._connect).grid(row=0, column=5, padx=5)
        ttk.Button(conn_frame, text="Disconnect", command=self._disconnect).grid(row=0, column=6, padx=5)
        
        # Status
        self.status_label = ttk.Label(conn_frame, text="Disconnected")
        self.status_label.grid(row=0, column=7, padx=20, pady=5)
        
        # Control frame
        control_frame = ttk.LabelFrame(parent, text="Control")
        control_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Button(control_frame, text="Start Monitor", command=self._start_monitoring).pack(side='left', padx=5)
        ttk.Button(control_frame, text="Stop Monitor", command=self._stop_monitoring).pack(side='left', padx=5)
        ttk.Button(control_frame, text="Clear Display", command=self._clear_display).pack(side='left', padx=5)
        ttk.Button(control_frame, text="Save Log", command=self._save_log).pack(side='left', padx=5)
        
        # Display options
        ttk.Checkbutton(control_frame, text="Auto Scroll", variable=self.auto_scroll).pack(side='right', padx=5)
        ttk.Checkbutton(control_frame, text="Show Hex", variable=self.display_hex).pack(side='right', padx=5)
        ttk.Checkbutton(control_frame, text="Timestamps", variable=self.display_timestamps).pack(side='right', padx=5)
        
        # Data display
        display_frame = ttk.LabelFrame(parent, text="Data")
        display_frame.pack(fill='both', expand=True, padx=5, pady=5)
        
        self.data_text = scrolledtext.ScrolledText(display_frame, wrap=tk.WORD, font=('Consolas', 10))
        self.data_text.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Configure text tags for coloring
        self.data_text.tag_configure('rx', foreground='blue')
        self.data_text.tag_configure('tx', foreground='red')
        self.data_text.tag_configure('timestamp', foreground='gray')
        self.data_text.tag_configure('hex', foreground='green')
        
    def _create_send_tab(self, parent):
        """Create send data tab"""
        # Send frame
        send_frame = ttk.LabelFrame(parent, text="Send Data")
        send_frame.pack(fill='x', padx=5, pady=5)
        
        # Single line send
        ttk.Label(send_frame, text="Send Text:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
        self.send_text_var = tk.StringVar()
        send_entry = ttk.Entry(send_frame, textvariable=self.send_text_var, width=50)
        send_entry.grid(row=0, column=1, padx=5, pady=5, sticky='ew')
        send_entry.bind('<Return>', lambda e: self._send_text())
        
        ttk.Button(send_frame, text="Send", command=self._send_text).grid(row=0, column=2, padx=5)
        
        # Line ending options
        ttk.Label(send_frame, text="Line Ending:").grid(row=1, column=0, padx=5, pady=5, sticky='w')
        self.line_ending_var = tk.StringVar(value="\\r\\n")
        ending_combo = ttk.Combobox(send_frame, textvariable=self.line_ending_var, 
                                   values=["None", "\\n", "\\r", "\\r\\n"], width=10)
        ending_combo.grid(row=1, column=1, padx=5, pady=5, sticky='w')
        
        send_frame.columnconfigure(1, weight=1)
        
        # Multi-line send
        multi_frame = ttk.LabelFrame(parent, text="Multi-line Send")
        multi_frame.pack(fill='both', expand=True, padx=5, pady=5)
        
        self.multi_text = scrolledtext.ScrolledText(multi_frame, height=10, font=('Consolas', 10))
        self.multi_text.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Multi-line controls
        multi_control = ttk.Frame(multi_frame)
        multi_control.pack(fill='x', padx=5, pady=5)
        
        ttk.Button(multi_control, text="Send All", command=self._send_multi_text).pack(side='left', padx=5)
        ttk.Button(multi_control, text="Clear", command=lambda: self.multi_text.delete(1.0, tk.END)).pack(side='left', padx=5)
        ttk.Button(multi_control, text="Load File", command=self._load_send_file).pack(side='left', padx=5)
        
        # Delay option for multi-line
        ttk.Label(multi_control, text="Line Delay (ms):").pack(side='right', padx=5)
        self.line_delay_var = tk.StringVar(value="100")
        ttk.Entry(multi_control, textvariable=self.line_delay_var, width=10).pack(side='right', padx=5)
        
        # Hex send
        hex_frame = ttk.LabelFrame(parent, text="Send Hex")
        hex_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Label(hex_frame, text="Hex Data:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
        self.hex_var = tk.StringVar()
        hex_entry = ttk.Entry(hex_frame, textvariable=self.hex_var, width=50)
        hex_entry.grid(row=0, column=1, padx=5, pady=5, sticky='ew')
        
        ttk.Button(hex_frame, text="Send Hex", command=self._send_hex).grid(row=0, column=2, padx=5)
        
        hex_frame.columnconfigure(1, weight=1)
        
    def _create_statistics_tab(self, parent):
        """Create statistics tab"""
        stats_frame = ttk.LabelFrame(parent, text="Communication Statistics")
        stats_frame.pack(fill='both', expand=True, padx=5, pady=5)
        
        self.stats_text = scrolledtext.ScrolledText(stats_frame, height=20, font=('Consolas', 10))
        self.stats_text.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Control buttons
        button_frame = ttk.Frame(stats_frame)
        button_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Button(button_frame, text="Update Stats", command=self._update_statistics).pack(side='left', padx=5)
        ttk.Button(button_frame, text="Reset Stats", command=self._reset_statistics).pack(side='left', padx=5)
        ttk.Button(button_frame, text="Export Stats", command=self._export_statistics).pack(side='left', padx=5)
        
    def _create_settings_tab(self, parent):
        """Create settings tab"""
        # Display settings
        display_frame = ttk.LabelFrame(parent, text="Display Settings")
        display_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Checkbutton(display_frame, text="Log to File", variable=self.log_to_file).pack(anchor='w', padx=5, pady=2)
        
        # Font settings
        font_frame = ttk.Frame(display_frame)
        font_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Label(font_frame, text="Font Size:").pack(side='left', padx=5)
        self.font_size_var = tk.StringVar(value="10")
        font_spin = tk.Spinbox(font_frame, from_=8, to=20, width=5, textvariable=self.font_size_var)
        font_spin.pack(side='left', padx=5)
        
        ttk.Button(font_frame, text="Apply", command=self._update_font).pack(side='left', padx=5)
        
        # Buffer settings
        buffer_frame = ttk.LabelFrame(parent, text="Buffer Settings")
        buffer_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Label(buffer_frame, text="Max Display Lines:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
        self.max_lines_var = tk.StringVar(value="1000")
        ttk.Entry(buffer_frame, textvariable=self.max_lines_var, width=10).grid(row=0, column=1, padx=5, pady=5)
        
        # Advanced settings
        advanced_frame = ttk.LabelFrame(parent, text="Advanced Settings")
        advanced_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Label(advanced_frame, text="Data Bits:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
        self.databits_var = tk.StringVar(value="8")
        ttk.Combobox(advanced_frame, textvariable=self.databits_var, values=["5", "6", "7", "8"], width=5).grid(row=0, column=1, padx=5, pady=5)
        
        ttk.Label(advanced_frame, text="Parity:").grid(row=0, column=2, padx=5, pady=5, sticky='w')
        self.parity_var = tk.StringVar(value="None")
        ttk.Combobox(advanced_frame, textvariable=self.parity_var, values=["None", "Even", "Odd"], width=8).grid(row=0, column=3, padx=5, pady=5)
        
        ttk.Label(advanced_frame, text="Stop Bits:").grid(row=1, column=0, padx=5, pady=5, sticky='w')
        self.stopbits_var = tk.StringVar(value="1")
        ttk.Combobox(advanced_frame, textvariable=self.stopbits_var, values=["1", "1.5", "2"], width=5).grid(row=1, column=1, padx=5, pady=5)
        
    def _refresh_ports(self):
        """Refresh available ports"""
        ports = serial.tools.list_ports.comports()
        port_list = [port.device for port in ports]
        self.port_combo['values'] = port_list
        
        if port_list and not self.port_var.get():
            self.port_var.set(port_list[0])
            
    def _connect(self):
        """Connect to serial port"""
        if self.connected.get():
            return
            
        port = self.port_var.get()
        if not port:
            messagebox.showerror("Error", "Please select a port")
            return
            
        try:
            # Parse settings
            baudrate = int(self.baudrate_var.get())
            
            databits_map = {"5": 5, "6": 6, "7": 7, "8": 8}
            databits = databits_map[self.databits_var.get()]
            
            parity_map = {"None": serial.PARITY_NONE, "Even": serial.PARITY_EVEN, "Odd": serial.PARITY_ODD}
            parity = parity_map[self.parity_var.get()]
            
            stopbits_map = {"1": serial.STOPBITS_ONE, "1.5": serial.STOPBITS_ONE_POINT_FIVE, "2": serial.STOPBITS_TWO}
            stopbits = stopbits_map[self.stopbits_var.get()]
            
            # Connect
            self.serial = serial.Serial(
                port=port,
                baudrate=baudrate,
                bytesize=databits,
                parity=parity,
                stopbits=stopbits,
                timeout=0.1
            )
            
            self.connected.set(True)
            self.status_label.config(text=f"Connected to {port}")
            
            # Reset statistics
            self._reset_statistics()
            
        except Exception as e:
            messagebox.showerror("Connection Error", f"Failed to connect: {e}")
            
    def _disconnect(self):
        """Disconnect from serial port"""
        if self.serial:
            self._stop_monitoring()
            self.serial.close()
            self.serial = None
            
        self.connected.set(False)
        self.status_label.config(text="Disconnected")
        
    def _start_monitoring(self):
        """Start monitoring serial data"""
        if not self.connected.get():
            messagebox.showerror("Error", "Not connected to a port")
            return
            
        self.monitoring.set(True)
        self.stats['start_time'] = datetime.now()
        
        # Start monitor thread
        self.monitor_thread = threading.Thread(target=self._monitor_worker)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def _stop_monitoring(self):
        """Stop monitoring"""
        self.monitoring.set(False)
        
    def _monitor_worker(self):
        """Background monitoring worker"""
        while self.monitoring.get() and self.connected.get():
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    
                    # Add to queue for GUI processing
                    self.data_queue.put({
                        'timestamp': datetime.now(),
                        'direction': 'RX',
                        'data': data
                    })
                    
                    self.stats['bytes_received'] += len(data)
                    
            except Exception as e:
                print(f"Monitor error: {e}")
                
            time.sleep(0.01)
            
    def _process_data_queue(self):
        """Process data queue for display"""
        while True:
            try:
                item = self.data_queue.get(timeout=0.1)
                self.root.after(0, self._display_data, item)
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Data processing error: {e}")
                
    def _display_data(self, item):
        """Display data in GUI"""
        timestamp = item['timestamp']
        direction = item['direction']
        data = item['data']
        
        # Format display text
        display_text = ""
        
        if self.display_timestamps.get():
            ts_str = timestamp.strftime("%H:%M:%S.%f")[:-3]
            display_text += f"[{ts_str}] "
            
        display_text += f"{direction}: "
        
        if self.display_hex.get():
            # Display as hex
            hex_str = data.hex().upper()
            # Add spaces between bytes
            hex_formatted = ' '.join(hex_str[i:i+2] for i in range(0, len(hex_str), 2))
            display_text += hex_formatted
        else:
            # Display as text
            text_str = data.decode('utf-8', errors='ignore')
            display_text += repr(text_str)
            
        display_text += "\n"
        
        # Insert with appropriate tags
        start_pos = self.data_text.index(tk.END)
        self.data_text.insert(tk.END, display_text)
        end_pos = self.data_text.index(tk.END)
        
        # Apply tags
        if direction == 'RX':
            self.data_text.tag_add('rx', start_pos, end_pos)
        else:
            self.data_text.tag_add('tx', start_pos, end_pos)
            
        # Limit buffer size
        max_lines = int(self.max_lines_var.get())
        lines = int(self.data_text.index('end-1c').split('.')[0])
        if lines > max_lines:
            self.data_text.delete('1.0', f'{lines - max_lines}.0')
            
        # Auto scroll
        if self.auto_scroll.get():
            self.data_text.see(tk.END)
            
        # Log to file
        if self.log_to_file.get() and not self.log_file:
            timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.log_file = open(f'serial_log_{timestamp_str}.txt', 'w')
            
        if self.log_file:
            self.log_file.write(display_text)
            self.log_file.flush()
            
    def _send_text(self):
        """Send text data"""
        if not self.connected.get():
            messagebox.showerror("Error", "Not connected")
            return
            
        text = self.send_text_var.get()
        if not text:
            return
            
        # Add line ending
        ending = self.line_ending_var.get()
        if ending != "None":
            ending = ending.replace('\\n', '\n').replace('\\r', '\r')
            text += ending
            
        # Send data
        data = text.encode('utf-8')
        self.serial.write(data)
        
        # Add to display
        self.data_queue.put({
            'timestamp': datetime.now(),
            'direction': 'TX',
            'data': data
        })
        
        self.stats['bytes_sent'] += len(data)
        self.send_text_var.set("")  # Clear input
        
    def _send_multi_text(self):
        """Send multi-line text"""
        if not self.connected.get():
            messagebox.showerror("Error", "Not connected")
            return
            
        text = self.multi_text.get(1.0, tk.END).strip()
        if not text:
            return
            
        lines = text.split('\n')
        delay = int(self.line_delay_var.get()) / 1000.0
        
        def send_worker():
            for line in lines:
                if line.strip():
                    self._send_line_with_ending(line)
                    time.sleep(delay)
                    
        threading.Thread(target=send_worker, daemon=True).start()
        
    def _send_line_with_ending(self, line):
        """Send single line with line ending"""
        ending = self.line_ending_var.get()
        if ending != "None":
            ending = ending.replace('\\n', '\n').replace('\\r', '\r')
            line += ending
            
        data = line.encode('utf-8')
        self.serial.write(data)
        
        self.data_queue.put({
            'timestamp': datetime.now(),
            'direction': 'TX',
            'data': data
        })
        
        self.stats['bytes_sent'] += len(data)
        
    def _send_hex(self):
        """Send hex data"""
        if not self.connected.get():
            messagebox.showerror("Error", "Not connected")
            return
            
        hex_str = self.hex_var.get().replace(' ', '').replace('0x', '')
        
        try:
            data = bytes.fromhex(hex_str)
            self.serial.write(data)
            
            self.data_queue.put({
                'timestamp': datetime.now(),
                'direction': 'TX',
                'data': data
            })
            
            self.stats['bytes_sent'] += len(data)
            self.hex_var.set("")
            
        except ValueError:
            messagebox.showerror("Error", "Invalid hex data")
            
    def _clear_display(self):
        """Clear the display"""
        self.data_text.delete(1.0, tk.END)
        
    def _save_log(self):
        """Save log to file"""
        filename = filedialog.asksaveasfilename(
            defaultextension=".txt",
            filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
        )
        
        if filename:
            try:
                with open(filename, 'w') as f:
                    f.write(self.data_text.get(1.0, tk.END))
                messagebox.showinfo("Success", f"Log saved to {filename}")
            except Exception as e:
                messagebox.showerror("Error", f"Failed to save: {e}")
                
    def _load_send_file(self):
        """Load file to send"""
        filename = filedialog.askopenfilename(
            filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
        )
        
        if filename:
            try:
                with open(filename, 'r') as f:
                    content = f.read()
                self.multi_text.delete(1.0, tk.END)
                self.multi_text.insert(1.0, content)
            except Exception as e:
                messagebox.showerror("Error", f"Failed to load: {e}")
                
    def _update_font(self):
        """Update font size"""
        try:
            size = int(self.font_size_var.get())
            font = ('Consolas', size)
            self.data_text.config(font=font)
            self.multi_text.config(font=font)
        except ValueError:
            pass
            
    def _update_statistics(self):
        """Update statistics display"""
        stats_text = "Communication Statistics\n"
        stats_text += "=" * 30 + "\n\n"
        
        if self.stats['start_time']:
            duration = datetime.now() - self.stats['start_time']
            stats_text += f"Session Duration: {duration}\n"
            
            if duration.total_seconds() > 0:
                rx_rate = self.stats['bytes_received'] / duration.total_seconds()
                tx_rate = self.stats['bytes_sent'] / duration.total_seconds()
                stats_text += f"RX Rate: {rx_rate:.1f} bytes/sec\n"
                stats_text += f"TX Rate: {tx_rate:.1f} bytes/sec\n"
                
        stats_text += f"\nBytes Received: {self.stats['bytes_received']:,}\n"
        stats_text += f"Bytes Sent: {self.stats['bytes_sent']:,}\n"
        stats_text += f"Total Bytes: {self.stats['bytes_received'] + self.stats['bytes_sent']:,}\n"
        
        # Connection info
        if self.connected.get() and self.serial:
            stats_text += f"\nConnection Info:\n"
            stats_text += f"Port: {self.serial.port}\n"
            stats_text += f"Baudrate: {self.serial.baudrate}\n"
            stats_text += f"Data Bits: {self.serial.bytesize}\n"
            stats_text += f"Parity: {self.serial.parity}\n"
            stats_text += f"Stop Bits: {self.serial.stopbits}\n"
            
        self.stats_text.delete(1.0, tk.END)
        self.stats_text.insert(1.0, stats_text)
        
    def _reset_statistics(self):
        """Reset statistics"""
        self.stats = {
            'bytes_received': 0,
            'bytes_sent': 0,
            'lines_received': 0,
            'start_time': datetime.now() if self.connected.get() else None
        }
        self._update_statistics()
        
    def _export_statistics(self):
        """Export statistics to JSON"""
        filename = filedialog.asksaveasfilename(
            defaultextension=".json",
            filetypes=[("JSON files", "*.json"), ("All files", "*.*")]
        )
        
        if filename:
            try:
                export_data = self.stats.copy()
                if export_data['start_time']:
                    export_data['start_time'] = export_data['start_time'].isoformat()
                export_data['export_time'] = datetime.now().isoformat()
                
                with open(filename, 'w') as f:
                    json.dump(export_data, f, indent=2)
                    
                messagebox.showinfo("Success", f"Statistics exported to {filename}")
            except Exception as e:
                messagebox.showerror("Error", f"Failed to export: {e}")
                
    def _update_gui(self):
        """Update GUI periodically"""
        # Auto-update statistics
        if self.connected.get() and self.monitoring.get():
            self._update_statistics()
            
        # Schedule next update
        self.root.after(5000, self._update_gui)  # Update every 5 seconds
        
    def run(self):
        """Run the monitor"""
        # Initialize
        self._refresh_ports()
        
        # Start main loop
        try:
            self.root.mainloop()
        finally:
            # Cleanup
            if self.log_file:
                self.log_file.close()
            if self.serial:
                self.serial.close()

# Run the monitor
if __name__ == "__main__":
    monitor = SerialPortMonitor()
    monitor.run()

Protocol Simulator

import serial
import threading
import time
from datetime import datetime
from typing import Dict, List, Callable, Any
import random
import json

class ProtocolSimulator:
    def __init__(self, port: str, baudrate: int = 9600):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.running = False
        
        # Protocol handlers
        self.handlers = {}
        self.response_delays = {}
        
        # Default handlers
        self._setup_default_handlers()
        
        # Statistics
        self.stats = {
            'commands_processed': 0,
            'responses_sent': 0,
            'errors': 0,
            'start_time': None
        }
        
    def _setup_default_handlers(self):
        """Setup default protocol handlers"""
        # AT Command handler
        self.add_handler('AT.*', self._handle_at_command, delay_range=(0.1, 0.5))
        
        # JSON handler  
        self.add_handler(r'\{.*\}', self._handle_json, delay_range=(0.05, 0.2))
        
        # Modbus RTU handler (simplified)
        self.add_handler(r'[\x01-\xF7][\x01-\x10].*', self._handle_modbus_rtu, delay_range=(0.01, 0.05))
        
        # NMEA handler
        self.add_handler(r'\$.*\*[0-9A-F]{2}', self._handle_nmea, delay_range=(0.1, 1.0))
        
    def connect(self) -> bool:
        """Connect to serial port"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=0.1
            )
            print(f"✅ Protocol simulator connected to {self.port}")
            return True
        except Exception as e:
            print(f"❌ Connection failed: {e}")
            return False
            
    def add_handler(self, pattern: str, handler: Callable, delay_range: tuple = (0.01, 0.1)):
        """Add protocol handler"""
        import re
        self.handlers[re.compile(pattern)] = handler
        self.response_delays[handler] = delay_range
        
    def start_simulation(self):
        """Start protocol simulation"""
        if not self.serial:
            return False
            
        self.running = True
        self.stats['start_time'] = datetime.now()
        
        # Start simulation thread
        self.sim_thread = threading.Thread(target=self._simulation_worker)
        self.sim_thread.daemon = True
        self.sim_thread.start()
        
        print("🎭 Protocol simulation started")
        return True
        
    def _simulation_worker(self):
        """Background simulation worker"""
        command_buffer = bytearray()
        
        while self.running:
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    command_buffer.extend(data)
                    
                    # Process complete commands
                    while self._process_command_buffer(command_buffer):
                        pass
                        
            except Exception as e:
                print(f"Simulation error: {e}")
                self.stats['errors'] += 1
                
            time.sleep(0.001)
            
    def _process_command_buffer(self, buffer: bytearray) -> bool:
        """Process command from buffer"""
        if not buffer:
            return False
            
        # Try different command delimiters
        delimiters = [b'\r\n', b'\n', b'\r', b'\0']
        
        for delimiter in delimiters:
            if delimiter in buffer:
                command_data, remaining = buffer.split(delimiter, 1)
                
                if command_data:
                    self._handle_command(command_data)
                    
                # Update buffer
                buffer.clear()
                buffer.extend(remaining)
                return True
                
        # Check for binary commands (fixed length or special markers)
        if len(buffer) >= 8:  # Minimum for Modbus RTU
            # Try to process as binary command
            if self._handle_binary_command(buffer):
                return True
                
        return False
        
    def _handle_command(self, command_data: bytes):
        """Handle received command"""
        try:
            # Try to decode as text
            command_str = command_data.decode('utf-8', errors='ignore')
        except:
            command_str = command_data.hex()
            
        print(f"📥 Received: {command_str}")
        
        # Find matching handler
        handler_found = False
        for pattern, handler in self.handlers.items():
            if pattern.search(command_str) or pattern.search(command_data.decode('utf-8', errors='ignore')):
                self._execute_handler(handler, command_data, command_str)
                handler_found = True
                break
                
        if not handler_found:
            # Send generic error response
            self._send_response(b'ERROR\r\n', delay=0.1)
            
        self.stats['commands_processed'] += 1
        
    def _handle_binary_command(self, buffer: bytearray) -> bool:
        """Handle binary command"""
        # Simplified Modbus RTU detection
        if len(buffer) >= 8 and buffer[0] <= 0xF7 and buffer[1] <= 0x10:
            # Assume Modbus RTU frame
            command_data = bytes(buffer[:8])  # Typical frame size
            self._handle_command(command_data)
            buffer[:8] = b''
            return True
            
        return False
        
    def _execute_handler(self, handler: Callable, command_data: bytes, command_str: str):
        """Execute protocol handler with delay"""
        def delayed_handler():
            # Calculate delay
            delay_range = self.response_delays.get(handler, (0.01, 0.1))
            delay = random.uniform(*delay_range)
            time.sleep(delay)
            
            try:
                response = handler(command_data, command_str)
                if response:
                    self._send_response(response)
            except Exception as e:
                print(f"Handler error: {e}")
                self.stats['errors'] += 1
                
        # Execute in separate thread for realistic timing
        threading.Thread(target=delayed_handler, daemon=True).start()
        
    def _send_response(self, response: bytes, delay: float = 0):
        """Send response with optional delay"""
        if delay > 0:
            time.sleep(delay)
            
        try:
            self.serial.write(response)
            self.serial.flush()
            
            # Decode for display
            try:
                response_str = response.decode('utf-8', errors='ignore')
            except:
                response_str = response.hex()
                
            print(f"📤 Sent: {response_str.strip()}")
            self.stats['responses_sent'] += 1
            
        except Exception as e:
            print(f"Send error: {e}")
            self.stats['errors'] += 1
            
    def _handle_at_command(self, command_data: bytes, command_str: str) -> bytes:
        """Handle AT commands"""
        cmd = command_str.strip().upper()
        
        if cmd == 'AT':
            return b'OK\r\n'
        elif cmd == 'ATI':
            return b'Protocol Simulator v1.0\r\nOK\r\n'
        elif cmd.startswith('AT+GMI'):
            return b'Simulator Corp\r\nOK\r\n'
        elif cmd.startswith('AT+CSQ'):
            # Random signal quality
            rssi = random.randint(5, 31)
            return f'+CSQ: {rssi},99\r\nOK\r\n'.encode()
        elif cmd.startswith('AT+COPS'):
            return b'+COPS: 0,0,"Simulator Network"\r\nOK\r\n'
        elif cmd.startswith('AT+'):
            # Generic extended command response
            return b'OK\r\n'
        else:
            return b'ERROR\r\n'
            
    def _handle_json(self, command_data: bytes, command_str: str) -> bytes:
        """Handle JSON commands"""
        try:
            data = json.loads(command_str)
            
            # Create response based on command
            response = {
                'status': 'success',
                'timestamp': datetime.now().isoformat(),
                'echo': data
            }
            
            # Add simulated data based on request
            if 'sensors' in data:
                response['sensors'] = {
                    'temperature': round(random.uniform(20, 30), 1),
                    'humidity': round(random.uniform(40, 80), 1),
                    'pressure': round(random.uniform(1000, 1020), 2)
                }
                
            if 'device_info' in data:
                response['device_info'] = {
                    'name': 'Protocol Simulator',
                    'version': '1.0',
                    'uptime': int(time.time())
                }
                
            return json.dumps(response).encode() + b'\r\n'
            
        except json.JSONDecodeError:
            error_response = {
                'status': 'error',
                'message': 'Invalid JSON'
            }
            return json.dumps(error_response).encode() + b'\r\n'
            
    def _handle_modbus_rtu(self, command_data: bytes, command_str: str) -> bytes:
        """Handle Modbus RTU commands (simplified)"""
        if len(command_data) < 4:
            return b''  # Invalid frame
            
        slave_id = command_data[0]
        function_code = command_data[1]
        
        # Simulate responses for common function codes
        if function_code == 0x03:  # Read holding registers
            if len(command_data) >= 6:
                start_addr = int.from_bytes(command_data[2:4], 'big')
                count = int.from_bytes(command_data[4:6], 'big')
                
                # Create response
                byte_count = count * 2
                response = bytearray([slave_id, function_code, byte_count])
                
                # Add random register values
                for _ in range(count):
                    value = random.randint(0, 65535)
                    response.extend(value.to_bytes(2, 'big'))
                    
                # Add CRC
                crc = self._calculate_modbus_crc(response)
                response.extend(crc.to_bytes(2, 'little'))
                
                return bytes(response)
                
        elif function_code == 0x01:  # Read coils
            # Simplified coil response
            response = bytearray([slave_id, function_code, 1, random.randint(0, 255)])
            crc = self._calculate_modbus_crc(response)
            response.extend(crc.to_bytes(2, 'little'))
            return bytes(response)
            
        return b''  # Unsupported function
        
    def _calculate_modbus_crc(self, data: bytes) -> int:
        """Calculate Modbus RTU CRC16"""
        crc = 0xFFFF
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                else:
                    crc >>= 1
        return crc
        
    def _handle_nmea(self, command_data: bytes, command_str: str) -> bytes:
        """Handle NMEA sentences"""
        sentence = command_str.strip()
        
        if sentence.startswith('$GPGGA'):
            # GPS fix sentence
            time_str = datetime.now().strftime('%H%M%S.00')
            lat = f"{random.uniform(40, 41):.6f}"
            lon = f"{random.uniform(-74, -73):.6f}"
            
            # Convert to DDMM.MMMM format
            lat_deg = int(float(lat))
            lat_min = (float(lat) - lat_deg) * 60
            lat_nmea = f"{lat_deg:02d}{lat_min:07.4f}"
            
            lon_deg = int(abs(float(lon)))
            lon_min = (abs(float(lon)) - lon_deg) * 60
            lon_nmea = f"{lon_deg:03d}{lon_min:07.4f}"
            
            response = f"$GPGGA,{time_str},{lat_nmea},N,{lon_nmea},W,1,08,1.0,545.4,M,46.9,M,,"
            
            # Calculate checksum
            checksum = 0
            for char in response[1:]:  # Skip $
                checksum ^= ord(char)
                
            response += f"*{checksum:02X}\r\n"
            return response.encode()
            
        return b''  # Unsupported sentence
        
    def add_custom_handler(self, name: str, pattern: str, response_func: Callable):
        """Add custom protocol handler"""
        self.add_handler(pattern, response_func)
        print(f"Added custom handler: {name}")
        
    def get_statistics(self) -> Dict:
        """Get simulation statistics"""
        stats = self.stats.copy()
        
        if stats['start_time']:
            duration = datetime.now() - stats['start_time']
            stats['duration'] = duration.total_seconds()
            
            if stats['duration'] > 0:
                stats['commands_per_second'] = stats['commands_processed'] / stats['duration']
                stats['responses_per_second'] = stats['responses_sent'] / stats['duration']
                
        return stats
        
    def stop_simulation(self):
        """Stop protocol simulation"""
        self.running = False
        
        if hasattr(self, 'sim_thread'):
            self.sim_thread.join()
            
        print("🎭 Protocol simulation stopped")
        
    def close(self):
        """Close simulator"""
        self.stop_simulation()
        if self.serial:
            self.serial.close()

# Example usage with custom handler
def custom_sensor_handler(command_data: bytes, command_str: str) -> bytes:
    """Custom sensor data handler"""
    if 'TEMP' in command_str:
        temp = round(random.uniform(20, 25), 1)
        return f"TEMP:{temp}C\r\n".encode()
    elif 'HUMID' in command_str:
        humid = round(random.uniform(45, 65), 1)
        return f"HUMID:{humid}%\r\n".encode()
    return b'UNKNOWN\r\n'

# Create and run simulator
simulator = ProtocolSimulator('/dev/ttyUSB1', 115200)  # Different port for simulation

# Add custom handler
simulator.add_custom_handler('Sensor Commands', r'(TEMP|HUMID)', custom_sensor_handler)

if simulator.connect():
    simulator.start_simulation()
    
    try:
        print("Protocol simulator running... (Press Ctrl+C to stop)")
        
        # Show statistics periodically
        while True:
            time.sleep(10)
            stats = simulator.get_statistics()
            print(f"📊 Stats: {stats['commands_processed']} commands, {stats['responses_sent']} responses, {stats['errors']} errors")
            
    except KeyboardInterrupt:
        pass
    finally:
        simulator.close()

Professional debugging tools are essential for reliable PySerial applications. These utilities provide deep insight into communication patterns, timing characteristics, and protocol behavior for robust development.

How is this guide?