PySerial
PySerialDocs

Data Logging & Analysis

Complete guide to serial data logging with PySerial - capture sensor data, store in databases, and build real-time analysis systems

Build comprehensive data logging systems with PySerial - capture sensor data in real-time, store to databases, and perform analysis for scientific and industrial applications.

Basic Data Logger

Start with a simple file-based logger that can capture and timestamp all serial data with configurable output formats.

import serial
import threading
import time
from datetime import datetime
from pathlib import Path
import os

class SerialDataLogger:
    def __init__(self, port: str, baudrate: int = 9600, log_dir: str = "serial_logs"):
        self.port = port
        self.baudrate = baudrate
        self.log_dir = Path(log_dir)
        self.serial = None
        self.logging = False
        
        # Create log directory
        self.log_dir.mkdir(exist_ok=True)
        
        # Generate log filename with timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.log_file = self.log_dir / f"serial_log_{timestamp}.txt"
        
        # Statistics
        self.stats = {
            'lines_logged': 0,
            'bytes_logged': 0,
            'start_time': None,
            'last_entry_time': None
        }
        
    def connect(self) -> bool:
        """Connect to serial port"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=1
            )
            print(f"✅ Connected to {self.port}")
            print(f"📁 Logging to: {self.log_file}")
            return True
        except Exception as e:
            print(f"❌ Connection failed: {e}")
            return False
            
    def start_logging(self):
        """Start data logging"""
        if not self.serial:
            print("❌ Not connected to serial port")
            return False
            
        if self.logging:
            print("⚠️ Already logging")
            return True
            
        self.logging = True
        self.stats['start_time'] = datetime.now()
        
        # Start logging thread
        self.log_thread = threading.Thread(target=self._logging_worker)
        self.log_thread.daemon = True
        self.log_thread.start()
        
        print("🔄 Data logging started")
        return True
        
    def _logging_worker(self):
        """Background logging worker"""
        with open(self.log_file, 'w', encoding='utf-8') as f:
            # Write header
            f.write(f"# Serial Data Log\n")
            f.write(f"# Port: {self.port}\n")
            f.write(f"# Baudrate: {self.baudrate}\n")
            f.write(f"# Started: {self.stats['start_time']}\n")
            f.write(f"# Format: [TIMESTAMP] DATA\n")
            f.write("#" + "="*50 + "\n\n")
            f.flush()
            
            while self.logging:
                try:
                    if self.serial.in_waiting:
                        # Read available data
                        data = self.serial.read(self.serial.in_waiting)
                        
                        # Process line by line
                        lines = data.decode('utf-8', errors='ignore').splitlines()
                        
                        for line in lines:
                            if line.strip():  # Skip empty lines
                                timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                                log_entry = f"[{timestamp}] {line}\n"
                                
                                f.write(log_entry)
                                f.flush()  # Ensure data is written immediately
                                
                                # Update statistics
                                self.stats['lines_logged'] += 1
                                self.stats['bytes_logged'] += len(log_entry)
                                self.stats['last_entry_time'] = datetime.now()
                                
                                print(f"📝 {line}")  # Optional: print to console
                                
                except Exception as e:
                    print(f"Logging error: {e}")
                    
                time.sleep(0.01)  # Small delay to prevent CPU spinning
                
    def stop_logging(self):
        """Stop data logging"""
        if not self.logging:
            return
            
        self.logging = False
        
        if hasattr(self, 'log_thread'):
            self.log_thread.join()
            
        # Write summary
        self._write_summary()
        
        print("⏹️ Data logging stopped")
        print(f"📊 Summary: {self.stats['lines_logged']} lines, {self.stats['bytes_logged']} bytes")
        
    def _write_summary(self):
        """Write logging summary to file"""
        try:
            with open(self.log_file, 'a', encoding='utf-8') as f:
                f.write(f"\n# Logging Summary\n")
                f.write(f"# Stopped: {datetime.now()}\n")
                f.write(f"# Lines logged: {self.stats['lines_logged']}\n")
                f.write(f"# Bytes logged: {self.stats['bytes_logged']}\n")
                
                if self.stats['start_time'] and self.stats['last_entry_time']:
                    duration = self.stats['last_entry_time'] - self.stats['start_time']
                    f.write(f"# Duration: {duration}\n")
                    
        except Exception as e:
            print(f"Error writing summary: {e}")
            
    def get_statistics(self):
        """Get logging statistics"""
        stats = self.stats.copy()
        
        if stats['start_time']:
            if self.logging and stats['last_entry_time']:
                stats['duration'] = stats['last_entry_time'] - stats['start_time']
            elif not self.logging:
                stats['duration'] = datetime.now() - stats['start_time']
                
            if stats.get('duration'):
                stats['lines_per_second'] = stats['lines_logged'] / stats['duration'].total_seconds()
                stats['bytes_per_second'] = stats['bytes_logged'] / stats['duration'].total_seconds()
                
        return stats
        
    def close(self):
        """Close logger and serial connection"""
        self.stop_logging()
        if self.serial:
            self.serial.close()

# Example usage
logger = SerialDataLogger('/dev/ttyUSB0', 115200)

if logger.connect():
    logger.start_logging()
    
    try:
        # Log for 30 seconds
        time.sleep(30)
        
        # Show statistics
        stats = logger.get_statistics()
        print(f"\n📈 Logging Statistics:")
        print(f"   Lines: {stats['lines_logged']}")
        print(f"   Bytes: {stats['bytes_logged']}")
        if 'lines_per_second' in stats:
            print(f"   Rate: {stats['lines_per_second']:.1f} lines/sec")
            
    except KeyboardInterrupt:
        print("\nStopping logger...")
        
    finally:
        logger.close()
import csv
import serial
import threading
import time
from datetime import datetime
from typing import List, Dict, Callable, Optional
import re

class SerialCSVLogger:
    def __init__(self, port: str, baudrate: int = 9600, csv_file: str = None):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.logging = False
        
        # Generate CSV filename if not provided
        if csv_file is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.csv_file = f"serial_data_{timestamp}.csv"
        else:
            self.csv_file = csv_file
            
        # Data parsing configuration
        self.columns = ['timestamp', 'raw_data']
        self.data_parsers = []
        self.field_separator = ','
        self.decimal_separator = '.'
        
        # Statistics
        self.rows_written = 0
        
    def add_data_parser(self, name: str, parser_func: Callable[[str], Dict]):
        """Add custom data parser"""
        self.data_parsers.append({
            'name': name,
            'parser': parser_func
        })
        
    def set_csv_columns(self, columns: List[str]):
        """Set CSV column headers"""
        self.columns = ['timestamp'] + columns
        
    def connect(self) -> bool:
        """Connect to serial port"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=1
            )
            print(f"✅ Connected to {self.port}")
            print(f"📊 CSV file: {self.csv_file}")
            return True
        except Exception as e:
            print(f"❌ Connection failed: {e}")
            return False
            
    def start_logging(self):
        """Start CSV logging"""
        if not self.serial:
            return False
            
        self.logging = True
        
        # Initialize CSV file with headers
        with open(self.csv_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(self.columns)
            
        # Start logging thread
        self.log_thread = threading.Thread(target=self._csv_logging_worker)
        self.log_thread.daemon = True
        self.log_thread.start()
        
        print("📈 CSV logging started")
        return True
        
    def _csv_logging_worker(self):
        """CSV logging worker"""
        with open(self.csv_file, 'a', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            
            while self.logging:
                try:
                    if self.serial.in_waiting:
                        line = self.serial.readline().decode('utf-8', errors='ignore').strip()
                        
                        if line:
                            # Parse data
                            parsed_data = self._parse_data_line(line)
                            
                            if parsed_data:
                                # Write to CSV
                                row = [datetime.now().isoformat()] + [
                                    parsed_data.get(col, '') for col in self.columns[1:]
                                ]
                                writer.writerow(row)
                                f.flush()
                                
                                self.rows_written += 1
                                
                                if self.rows_written % 100 == 0:
                                    print(f"📊 {self.rows_written} rows logged")
                                    
                except Exception as e:
                    print(f"CSV logging error: {e}")
                    
                time.sleep(0.01)
                
    def _parse_data_line(self, line: str) -> Optional[Dict]:
        """Parse data line using configured parsers"""
        # Always include raw data
        parsed_data = {'raw_data': line}
        
        # Apply custom parsers
        for parser_info in self.data_parsers:
            try:
                result = parser_info['parser'](line)
                if isinstance(result, dict):
                    parsed_data.update(result)
            except Exception as e:
                print(f"Parser '{parser_info['name']}' error: {e}")
                
        # Auto-parse numeric fields if no custom parsers
        if len(self.data_parsers) == 0:
            parsed_data.update(self._auto_parse_numeric(line))
            
        return parsed_data
        
    def _auto_parse_numeric(self, line: str) -> Dict:
        """Automatically parse numeric values from line"""
        data = {}
        
        # Try comma-separated values
        if self.field_separator in line:
            parts = line.split(self.field_separator)
            for i, part in enumerate(parts):
                try:
                    # Try to convert to float
                    value = float(part.strip().replace(self.decimal_separator, '.'))
                    data[f'field_{i+1}'] = value
                except ValueError:
                    data[f'field_{i+1}'] = part.strip()
                    
        # Try to extract all numbers
        numbers = re.findall(r'-?\d+\.?\d*', line)
        for i, num in enumerate(numbers):
            try:
                data[f'number_{i+1}'] = float(num)
            except ValueError:
                pass
                
        return data
        
    def stop_logging(self):
        """Stop CSV logging"""
        self.logging = False
        if hasattr(self, 'log_thread'):
            self.log_thread.join()
        print(f"📈 CSV logging stopped. {self.rows_written} rows written.")
        
    def close(self):
        """Close logger"""
        self.stop_logging()
        if self.serial:
            self.serial.close()

# Example parsers for common data formats
def temperature_humidity_parser(line: str) -> Dict:
    """Parse temperature and humidity data"""
    # Example: "Temperature: 25.5°C, Humidity: 60.2%"
    data = {}
    
    temp_match = re.search(r'temperature[:\s]+([0-9.-]+)', line, re.IGNORECASE)
    if temp_match:
        data['temperature'] = float(temp_match.group(1))
        
    humid_match = re.search(r'humidity[:\s]+([0-9.-]+)', line, re.IGNORECASE)
    if humid_match:
        data['humidity'] = float(humid_match.group(1))
        
    return data

def nmea_gps_parser(line: str) -> Dict:
    """Parse NMEA GPS sentences"""
    data = {}
    
    if line.startswith('$GPGGA'):
        parts = line.split(',')
        if len(parts) > 9:
            try:
                # Extract latitude, longitude, altitude
                if parts[2] and parts[3]:  # Latitude
                    lat = float(parts[2][:2]) + float(parts[2][2:]) / 60
                    if parts[3] == 'S':
                        lat = -lat
                    data['latitude'] = lat
                    
                if parts[4] and parts[5]:  # Longitude
                    lon = float(parts[4][:3]) + float(parts[4][3:]) / 60
                    if parts[5] == 'W':
                        lon = -lon
                    data['longitude'] = lon
                    
                if parts[9]:  # Altitude
                    data['altitude'] = float(parts[9])
                    
                if parts[7]:  # Number of satellites
                    data['satellites'] = int(parts[7])
                    
            except (ValueError, IndexError):
                pass
                
    return data

def sensor_array_parser(line: str) -> Dict:
    """Parse sensor array data format"""
    # Example: "S1:25.5,S2:60.2,S3:1013.2,S4:15.8"
    data = {}
    
    sensor_matches = re.findall(r'S(\d+):([0-9.-]+)', line)
    for sensor_num, value in sensor_matches:
        data[f'sensor_{sensor_num}'] = float(value)
        
    return data

# Example usage with custom parsers
csv_logger = SerialCSVLogger('/dev/ttyUSB0', 9600, 'sensor_data.csv')

# Configure columns and parser
csv_logger.set_csv_columns(['raw_data', 'temperature', 'humidity', 'pressure'])
csv_logger.add_data_parser('temp_humidity', temperature_humidity_parser)

if csv_logger.connect():
    csv_logger.start_logging()
    
    try:
        time.sleep(60)  # Log for 1 minute
    except KeyboardInterrupt:
        pass
    finally:
        csv_logger.close()
import json
import serial
import threading
import time
from datetime import datetime
from typing import Dict, List, Any
import uuid

class SerialJSONLogger:
    def __init__(self, port: str, baudrate: int = 9600, json_file: str = None):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.logging = False
        
        # Generate JSON filename if not provided
        if json_file is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.json_file = f"serial_data_{timestamp}.json"
        else:
            self.json_file = json_file
            
        # Configuration
        self.metadata = {
            'session_id': str(uuid.uuid4()),
            'port': port,
            'baudrate': baudrate,
            'start_time': None,
            'end_time': None
        }
        
        self.data_entries = []
        self.max_entries_in_memory = 1000
        self.auto_flush_interval = 10  # seconds
        
    def connect(self) -> bool:
        """Connect to serial port"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=1
            )
            print(f"✅ Connected to {self.port}")
            print(f"📄 JSON file: {self.json_file}")
            return True
        except Exception as e:
            print(f"❌ Connection failed: {e}")
            return False
            
    def start_logging(self):
        """Start JSON logging"""
        if not self.serial:
            return False
            
        self.logging = True
        self.metadata['start_time'] = datetime.now().isoformat()
        
        # Initialize JSON file
        self._write_initial_json()
        
        # Start logging thread
        self.log_thread = threading.Thread(target=self._json_logging_worker)
        self.log_thread.daemon = True
        self.log_thread.start()
        
        # Start auto-flush thread
        self.flush_thread = threading.Thread(target=self._auto_flush_worker)
        self.flush_thread.daemon = True
        self.flush_thread.start()
        
        print("🗂️ JSON logging started")
        return True
        
    def _write_initial_json(self):
        """Write initial JSON structure"""
        initial_data = {
            'metadata': self.metadata,
            'data': []
        }
        
        with open(self.json_file, 'w', encoding='utf-8') as f:
            json.dump(initial_data, f, indent=2)
            
    def _json_logging_worker(self):
        """JSON logging worker"""
        while self.logging:
            try:
                if self.serial.in_waiting:
                    line = self.serial.readline().decode('utf-8', errors='ignore').strip()
                    
                    if line:
                        # Create data entry
                        entry = {
                            'timestamp': datetime.now().isoformat(),
                            'raw_data': line,
                            'parsed_data': self._parse_structured_data(line),
                            'metadata': {
                                'length': len(line),
                                'encoding': 'utf-8'
                            }
                        }
                        
                        self.data_entries.append(entry)
                        
                        # Flush if memory limit reached
                        if len(self.data_entries) >= self.max_entries_in_memory:
                            self._flush_to_file()
                            
            except Exception as e:
                print(f"JSON logging error: {e}")
                
            time.sleep(0.01)
            
    def _auto_flush_worker(self):
        """Automatic flush worker"""
        while self.logging:
            time.sleep(self.auto_flush_interval)
            if self.data_entries:
                self._flush_to_file()
                
    def _flush_to_file(self):
        """Flush data entries to JSON file"""
        if not self.data_entries:
            return
            
        try:
            # Read existing data
            with open(self.json_file, 'r', encoding='utf-8') as f:
                json_data = json.load(f)
                
            # Add new entries
            json_data['data'].extend(self.data_entries)
            json_data['metadata'] = self.metadata
            
            # Write back to file
            with open(self.json_file, 'w', encoding='utf-8') as f:
                json.dump(json_data, f, indent=2)
                
            print(f"💾 Flushed {len(self.data_entries)} entries to JSON file")
            self.data_entries.clear()
            
        except Exception as e:
            print(f"Error flushing to JSON file: {e}")
            
    def _parse_structured_data(self, line: str) -> Dict[str, Any]:
        """Parse line into structured data"""
        parsed = {}
        
        # Try to parse as JSON first
        try:
            parsed = json.loads(line)
            parsed['format'] = 'json'
            return parsed
        except json.JSONDecodeError:
            pass
            
        # Try key-value pairs
        if '=' in line and ',' in line:
            pairs = line.split(',')
            for pair in pairs:
                if '=' in pair:
                    key, value = pair.split('=', 1)
                    key = key.strip()
                    value = value.strip()
                    
                    # Try to convert to number
                    try:
                        if '.' in value:
                            parsed[key] = float(value)
                        else:
                            parsed[key] = int(value)
                    except ValueError:
                        parsed[key] = value
                        
            if parsed:
                parsed['format'] = 'key_value'
                return parsed
                
        # Try comma-separated values
        if ',' in line:
            values = [v.strip() for v in line.split(',')]
            for i, value in enumerate(values):
                try:
                    if '.' in value:
                        parsed[f'value_{i}'] = float(value)
                    else:
                        parsed[f'value_{i}'] = int(value)
                except ValueError:
                    parsed[f'value_{i}'] = value
                    
            if parsed:
                parsed['format'] = 'csv'
                return parsed
                
        # Default: treat as single value
        try:
            if '.' in line:
                parsed['value'] = float(line)
            else:
                parsed['value'] = int(line)
        except ValueError:
            parsed['value'] = line
            
        parsed['format'] = 'single_value'
        return parsed
        
    def add_custom_metadata(self, key: str, value: Any):
        """Add custom metadata"""
        self.metadata[key] = value
        
    def stop_logging(self):
        """Stop JSON logging"""
        self.logging = False
        
        if hasattr(self, 'log_thread'):
            self.log_thread.join()
        if hasattr(self, 'flush_thread'):
            self.flush_thread.join()
            
        # Final flush
        if self.data_entries:
            self._flush_to_file()
            
        # Update metadata
        self.metadata['end_time'] = datetime.now().isoformat()
        self._update_metadata()
        
        print("🗂️ JSON logging stopped")
        
    def _update_metadata(self):
        """Update metadata in JSON file"""
        try:
            with open(self.json_file, 'r', encoding='utf-8') as f:
                json_data = json.load(f)
                
            json_data['metadata'] = self.metadata
            
            with open(self.json_file, 'w', encoding='utf-8') as f:
                json.dump(json_data, f, indent=2)
                
        except Exception as e:
            print(f"Error updating metadata: {e}")
            
    def get_data_summary(self) -> Dict:
        """Get summary of logged data"""
        try:
            with open(self.json_file, 'r', encoding='utf-8') as f:
                json_data = json.load(f)
                
            total_entries = len(json_data['data'])
            formats = {}
            
            for entry in json_data['data']:
                format_type = entry.get('parsed_data', {}).get('format', 'unknown')
                formats[format_type] = formats.get(format_type, 0) + 1
                
            return {
                'total_entries': total_entries,
                'formats': formats,
                'file_size': os.path.getsize(self.json_file) if os.path.exists(self.json_file) else 0,
                'metadata': json_data['metadata']
            }
            
        except Exception as e:
            return {'error': str(e)}
            
    def close(self):
        """Close logger"""
        self.stop_logging()
        if self.serial:
            self.serial.close()

# Example usage
json_logger = SerialJSONLogger('/dev/ttyUSB0', 9600, 'sensor_data.json')

# Add custom metadata
json_logger.add_custom_metadata('experiment', 'Temperature Monitoring')
json_logger.add_custom_metadata('location', 'Lab Room 101')
json_logger.add_custom_metadata('operator', 'John Doe')

if json_logger.connect():
    json_logger.start_logging()
    
    try:
        time.sleep(60)  # Log for 1 minute
        
        # Get summary
        summary = json_logger.get_data_summary()
        print(f"📊 Data Summary: {summary}")
        
    except KeyboardInterrupt:
        pass
    finally:
        json_logger.close()

Database Integration

Store serial data in databases for efficient querying, analysis, and long-term storage with proper indexing and data types.

import serial
import sqlite3
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import statistics
import json

class SerialDatabaseLogger:
    def __init__(self, port: str, baudrate: int = 9600, db_path: str = 'serial_data.db'):
        self.port = port
        self.baudrate = baudrate
        self.db_path = db_path
        self.serial = None
        self.logging = False
        
        # Initialize database
        self._init_database()
        
        # Session management
        self.session_id = None
        
    def _init_database(self):
        """Initialize SQLite database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Create tables
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS sessions (
                session_id INTEGER PRIMARY KEY AUTOINCREMENT,
                port TEXT NOT NULL,
                baudrate INTEGER NOT NULL,
                start_time DATETIME NOT NULL,
                end_time DATETIME,
                description TEXT,
                metadata TEXT
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS raw_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id INTEGER REFERENCES sessions(session_id),
                timestamp DATETIME NOT NULL,
                raw_data TEXT NOT NULL,
                data_length INTEGER,
                checksum TEXT
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS parsed_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id INTEGER REFERENCES sessions(session_id),
                timestamp DATETIME NOT NULL,
                data_type TEXT NOT NULL,
                field_name TEXT NOT NULL,
                numeric_value REAL,
                text_value TEXT,
                boolean_value BOOLEAN
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_id INTEGER REFERENCES sessions(session_id),
                timestamp DATETIME NOT NULL,
                event_type TEXT NOT NULL,
                description TEXT,
                severity INTEGER DEFAULT 1
            )
        ''')
        
        # Create indexes for performance
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_raw_data_timestamp 
            ON raw_data(timestamp)
        ''')
        
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_parsed_data_timestamp 
            ON parsed_data(timestamp)
        ''')
        
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_parsed_data_field 
            ON parsed_data(field_name, timestamp)
        ''')
        
        conn.commit()
        conn.close()
        
    def connect(self, description: str = "") -> bool:
        """Connect to serial port and start new session"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=1
            )
            
            # Create new session
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO sessions (port, baudrate, start_time, description)
                VALUES (?, ?, ?, ?)
            ''', (self.port, self.baudrate, datetime.now(), description))
            
            self.session_id = cursor.lastrowid
            conn.commit()
            conn.close()
            
            print(f"✅ Connected to {self.port}")
            print(f"🗄️ Database session {self.session_id} started")
            return True
            
        except Exception as e:
            print(f"❌ Connection failed: {e}")
            return False
            
    def start_logging(self):
        """Start database logging"""
        if not self.serial or not self.session_id:
            return False
            
        self.logging = True
        
        # Start logging thread
        self.log_thread = threading.Thread(target=self._database_logging_worker)
        self.log_thread.daemon = True
        self.log_thread.start()
        
        print("🗄️ Database logging started")
        return True
        
    def _database_logging_worker(self):
        """Database logging worker"""
        batch_size = 100
        batch_raw_data = []
        batch_parsed_data = []
        
        while self.logging:
            try:
                if self.serial.in_waiting:
                    line = self.serial.readline().decode('utf-8', errors='ignore').strip()
                    
                    if line:
                        timestamp = datetime.now()
                        
                        # Prepare raw data entry
                        raw_entry = (
                            self.session_id,
                            timestamp,
                            line,
                            len(line),
                            self._calculate_checksum(line)
                        )
                        batch_raw_data.append(raw_entry)
                        
                        # Parse and prepare structured data
                        parsed_fields = self._parse_line_to_fields(line, timestamp)
                        batch_parsed_data.extend(parsed_fields)
                        
                        # Batch insert when limit reached
                        if len(batch_raw_data) >= batch_size:
                            self._batch_insert(batch_raw_data, batch_parsed_data)
                            batch_raw_data.clear()
                            batch_parsed_data.clear()
                            
            except Exception as e:
                self._log_event('error', f'Logging error: {e}', severity=3)
                
            time.sleep(0.01)
            
        # Final batch insert
        if batch_raw_data or batch_parsed_data:
            self._batch_insert(batch_raw_data, batch_parsed_data)
            
    def _batch_insert(self, raw_data: List, parsed_data: List):
        """Batch insert data to database"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # Insert raw data
            if raw_data:
                cursor.executemany('''
                    INSERT INTO raw_data (session_id, timestamp, raw_data, data_length, checksum)
                    VALUES (?, ?, ?, ?, ?)
                ''', raw_data)
                
            # Insert parsed data
            if parsed_data:
                cursor.executemany('''
                    INSERT INTO parsed_data (session_id, timestamp, data_type, field_name, 
                                           numeric_value, text_value, boolean_value)
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                ''', parsed_data)
                
            conn.commit()
            conn.close()
            
            print(f"💾 Inserted {len(raw_data)} raw, {len(parsed_data)} parsed records")
            
        except Exception as e:
            self._log_event('error', f'Database insert error: {e}', severity=3)
            
    def _parse_line_to_fields(self, line: str, timestamp: datetime) -> List:
        """Parse line into database fields"""
        parsed_entries = []
        
        # Try JSON format
        try:
            data = json.loads(line)
            for key, value in data.items():
                parsed_entries.append(self._create_parsed_entry(
                    timestamp, 'json', key, value
                ))
            return parsed_entries
        except json.JSONDecodeError:
            pass
            
        # Try key=value format
        if '=' in line:
            pairs = line.replace(',', ' ').split()
            for pair in pairs:
                if '=' in pair:
                    key, value = pair.split('=', 1)
                    parsed_entries.append(self._create_parsed_entry(
                        timestamp, 'keyvalue', key.strip(), value.strip()
                    ))
            if parsed_entries:
                return parsed_entries
                
        # Try comma-separated values
        if ',' in line:
            values = [v.strip() for v in line.split(',')]
            for i, value in enumerate(values):
                parsed_entries.append(self._create_parsed_entry(
                    timestamp, 'csv', f'field_{i}', value
                ))
            if parsed_entries:
                return parsed_entries
                
        # Single value
        parsed_entries.append(self._create_parsed_entry(
            timestamp, 'single', 'value', line
        ))
        
        return parsed_entries
        
    def _create_parsed_entry(self, timestamp: datetime, data_type: str, 
                           field_name: str, value: Any) -> tuple:
        """Create parsed data entry tuple"""
        numeric_value = None
        text_value = None
        boolean_value = None
        
        # Determine value type and store appropriately
        if isinstance(value, bool):
            boolean_value = value
        elif isinstance(value, (int, float)):
            numeric_value = float(value)
        else:
            # Try to convert string to number
            try:
                numeric_value = float(value)
            except (ValueError, TypeError):
                # Try boolean
                if str(value).lower() in ('true', 'false', '1', '0', 'on', 'off'):
                    boolean_value = str(value).lower() in ('true', '1', 'on')
                else:
                    text_value = str(value)
                    
        return (
            self.session_id,
            timestamp,
            data_type,
            field_name,
            numeric_value,
            text_value,
            boolean_value
        )
        
    def _calculate_checksum(self, data: str) -> str:
        """Calculate simple checksum for data integrity"""
        return hex(sum(ord(c) for c in data) & 0xFFFF)
        
    def _log_event(self, event_type: str, description: str, severity: int = 1):
        """Log system event"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO events (session_id, timestamp, event_type, description, severity)
                VALUES (?, ?, ?, ?, ?)
            ''', (self.session_id, datetime.now(), event_type, description, severity))
            
            conn.commit()
            conn.close()
        except Exception as e:
            print(f"Error logging event: {e}")
            
    def query_data(self, field_name: str, hours: int = 24) -> List[Dict]:
        """Query parsed data for specific field"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT timestamp, numeric_value, text_value, boolean_value
            FROM parsed_data
            WHERE session_id = ? AND field_name = ?
            AND timestamp >= datetime('now', '-{} hours')
            ORDER BY timestamp
        '''.format(hours), (self.session_id, field_name))
        
        results = []
        for timestamp, num_val, text_val, bool_val in cursor.fetchall():
            value = num_val if num_val is not None else text_val if text_val is not None else bool_val
            results.append({
                'timestamp': timestamp,
                'value': value
            })
            
        conn.close()
        return results
        
    def get_statistics(self, field_name: str, hours: int = 24) -> Dict:
        """Get statistical analysis for numeric field"""
        data = self.query_data(field_name, hours)
        numeric_data = [item['value'] for item in data if isinstance(item['value'], (int, float))]
        
        if not numeric_data:
            return {'error': 'No numeric data found'}
            
        stats = {
            'count': len(numeric_data),
            'min': min(numeric_data),
            'max': max(numeric_data),
            'mean': statistics.mean(numeric_data),
            'median': statistics.median(numeric_data),
            'std_dev': statistics.stdev(numeric_data) if len(numeric_data) > 1 else 0,
            'first_timestamp': data[0]['timestamp'] if data else None,
            'last_timestamp': data[-1]['timestamp'] if data else None
        }
        
        return stats
        
    def export_session_data(self, filename: str, format: str = 'csv'):
        """Export session data to file"""
        conn = sqlite3.connect(self.db_path)
        
        if format.lower() == 'csv':
            import csv
            
            cursor = conn.cursor()
            cursor.execute('''
                SELECT r.timestamp, r.raw_data, p.field_name, p.numeric_value, p.text_value, p.boolean_value
                FROM raw_data r
                LEFT JOIN parsed_data p ON r.timestamp = p.timestamp
                WHERE r.session_id = ?
                ORDER BY r.timestamp
            ''', (self.session_id,))
            
            with open(filename, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow(['Timestamp', 'Raw Data', 'Field Name', 'Numeric Value', 'Text Value', 'Boolean Value'])
                writer.writerows(cursor.fetchall())
                
        elif format.lower() == 'json':
            cursor = conn.cursor()
            cursor.execute('''
                SELECT timestamp, raw_data FROM raw_data WHERE session_id = ? ORDER BY timestamp
            ''', (self.session_id,))
            
            data = [{'timestamp': ts, 'raw_data': raw} for ts, raw in cursor.fetchall()]
            
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, default=str)
                
        conn.close()
        print(f"📤 Session data exported to {filename}")
        
    def stop_logging(self):
        """Stop database logging"""
        self.logging = False
        
        if hasattr(self, 'log_thread'):
            self.log_thread.join()
            
        # Update session end time
        if self.session_id:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                UPDATE sessions SET end_time = ? WHERE session_id = ?
            ''', (datetime.now(), self.session_id))
            
            conn.commit()
            conn.close()
            
        self._log_event('info', 'Logging session ended')
        print("🗄️ Database logging stopped")
        
    def close(self):
        """Close logger and connection"""
        self.stop_logging()
        if self.serial:
            self.serial.close()

# Example usage
db_logger = SerialDatabaseLogger('/dev/ttyUSB0', 9600, 'sensor_data.db')

if db_logger.connect("Temperature and Humidity Monitoring Experiment"):
    db_logger.start_logging()
    
    try:
        time.sleep(120)  # Log for 2 minutes
        
        # Query data
        temp_data = db_logger.query_data('temperature', hours=1)
        print(f"📊 Temperature readings: {len(temp_data)}")
        
        # Get statistics
        if temp_data:
            stats = db_logger.get_statistics('temperature', hours=1)
            print(f"📈 Temperature stats: Min={stats['min']:.1f}, Max={stats['max']:.1f}, Mean={stats['mean']:.1f}")
            
        # Export data
        db_logger.export_session_data('session_data.csv', 'csv')
        
    except KeyboardInterrupt:
        pass
    finally:
        db_logger.close()

Real-time Analysis and Visualization

Live Data Dashboard

import tkinter as tk
from tkinter import ttk
import threading
import time
from datetime import datetime, timedelta
import queue
from collections import deque
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.animation import FuncAnimation
import numpy as np

class RealTimeDataDashboard:
    def __init__(self, serial_port: str, baudrate: int = 9600):
        self.port = serial_port
        self.baudrate = baudrate
        self.serial = None
        
        # Data storage
        self.data_queue = queue.Queue()
        self.time_series_data = {
            'timestamps': deque(maxlen=1000),
            'values': deque(maxlen=1000)
        }
        
        # GUI setup
        self.root = tk.Tk()
        self.root.title("Real-time Serial Data Dashboard")
        self.root.geometry("1200x800")
        
        # Status variables
        self.connected = tk.BooleanVar(value=False)
        self.logging = tk.BooleanVar(value=False)
        self.total_messages = tk.IntVar(value=0)
        self.messages_per_second = tk.DoubleVar(value=0.0)
        
        # Create interface
        self._create_interface()
        
        # Data processing
        self.data_processor = threading.Thread(target=self._process_data_queue)
        self.data_processor.daemon = True
        self.data_processor.start()
        
    def _create_interface(self):
        """Create dashboard interface"""
        # Control panel
        control_frame = ttk.LabelFrame(self.root, text="Control Panel")
        control_frame.pack(fill='x', padx=5, pady=5)
        
        # Connection controls
        conn_frame = ttk.Frame(control_frame)
        conn_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Button(conn_frame, text="Connect", command=self._connect).pack(side='left', padx=5)
        ttk.Button(conn_frame, text="Disconnect", command=self._disconnect).pack(side='left', padx=5)
        ttk.Button(conn_frame, text="Start Logging", command=self._start_logging).pack(side='left', padx=5)
        ttk.Button(conn_frame, text="Stop Logging", command=self._stop_logging).pack(side='left', padx=5)
        ttk.Button(conn_frame, text="Clear Data", command=self._clear_data).pack(side='left', padx=5)
        
        # Status display
        status_frame = ttk.LabelFrame(self.root, text="Status")
        status_frame.pack(fill='x', padx=5, pady=5)
        
        status_grid = ttk.Frame(status_frame)
        status_grid.pack(fill='x', padx=5, pady=5)
        
        ttk.Label(status_grid, text="Connected:").grid(row=0, column=0, sticky='w')
        ttk.Label(status_grid, textvariable=self.connected).grid(row=0, column=1, sticky='w')
        
        ttk.Label(status_grid, text="Logging:").grid(row=0, column=2, sticky='w', padx=(20,0))
        ttk.Label(status_grid, textvariable=self.logging).grid(row=0, column=3, sticky='w')
        
        ttk.Label(status_grid, text="Total Messages:").grid(row=1, column=0, sticky='w')
        ttk.Label(status_grid, textvariable=self.total_messages).grid(row=1, column=1, sticky='w')
        
        ttk.Label(status_grid, text="Messages/sec:").grid(row=1, column=2, sticky='w', padx=(20,0))
        ttk.Label(status_grid, textvariable=self.messages_per_second).grid(row=1, column=3, sticky='w')
        
        # Data display
        data_notebook = ttk.Notebook(self.root)
        data_notebook.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Raw data tab
        raw_frame = ttk.Frame(data_notebook)
        data_notebook.add(raw_frame, text="Raw Data")
        
        self.raw_text = tk.Text(raw_frame, height=15, wrap=tk.WORD)
        raw_scrollbar = ttk.Scrollbar(raw_frame, orient="vertical", command=self.raw_text.yview)
        self.raw_text.configure(yscrollcommand=raw_scrollbar.set)
        
        self.raw_text.pack(side="left", fill="both", expand=True)
        raw_scrollbar.pack(side="right", fill="y")
        
        # Graph tab
        graph_frame = ttk.Frame(data_notebook)
        data_notebook.add(graph_frame, text="Real-time Graph")
        
        # Create matplotlib figure
        self.fig, self.ax = plt.subplots(figsize=(10, 6))
        self.ax.set_title("Real-time Data")
        self.ax.set_xlabel("Time")
        self.ax.set_ylabel("Value")
        self.ax.grid(True)
        
        self.canvas = FigureCanvasTkAgg(self.fig, graph_frame)
        self.canvas.get_tk_widget().pack(fill='both', expand=True)
        
        # Start animation
        self.animation = FuncAnimation(
            self.fig, self._update_graph, interval=100, blit=False
        )
        
        # Statistics tab
        stats_frame = ttk.Frame(data_notebook)
        data_notebook.add(stats_frame, text="Statistics")
        
        self.stats_text = tk.Text(stats_frame, height=15, wrap=tk.WORD)
        stats_scrollbar = ttk.Scrollbar(stats_frame, orient="vertical", command=self.stats_text.yview)
        self.stats_text.configure(yscrollcommand=stats_scrollbar.set)
        
        self.stats_text.pack(side="left", fill="both", expand=True)
        stats_scrollbar.pack(side="right", fill="y")
        
        # Update statistics every 5 seconds
        self.root.after(5000, self._update_statistics)
        
    def _connect(self):
        """Connect to serial port"""
        try:
            import serial
            self.serial = serial.Serial(self.port, self.baudrate, timeout=1)
            self.connected.set(True)
            print(f"✅ Connected to {self.port}")
        except Exception as e:
            print(f"❌ Connection failed: {e}")
            
    def _disconnect(self):
        """Disconnect from serial port"""
        if self.serial:
            self.serial.close()
            self.serial = None
        self.connected.set(False)
        self.logging.set(False)
        print("🔌 Disconnected")
        
    def _start_logging(self):
        """Start data logging"""
        if not self.serial:
            print("❌ Not connected")
            return
            
        self.logging.set(True)
        self.log_thread = threading.Thread(target=self._logging_worker)
        self.log_thread.daemon = True
        self.log_thread.start()
        print("🔄 Logging started")
        
    def _stop_logging(self):
        """Stop data logging"""
        self.logging.set(False)
        print("⏹️ Logging stopped")
        
    def _clear_data(self):
        """Clear all data"""
        self.time_series_data['timestamps'].clear()
        self.time_series_data['values'].clear()
        self.total_messages.set(0)
        
        # Clear text widgets
        self.raw_text.delete(1.0, tk.END)
        self.stats_text.delete(1.0, tk.END)
        
        print("🗑️ Data cleared")
        
    def _logging_worker(self):
        """Background logging worker"""
        message_count = 0
        start_time = time.time()
        
        while self.logging.get() and self.serial:
            try:
                if self.serial.in_waiting:
                    line = self.serial.readline().decode('utf-8', errors='ignore').strip()
                    
                    if line:
                        # Add to queue for processing
                        self.data_queue.put({
                            'timestamp': datetime.now(),
                            'data': line
                        })
                        
                        message_count += 1
                        
                        # Update rate calculation
                        elapsed = time.time() - start_time
                        if elapsed >= 1.0:
                            rate = message_count / elapsed
                            self.messages_per_second.set(round(rate, 2))
                            message_count = 0
                            start_time = time.time()
                            
            except Exception as e:
                print(f"Logging error: {e}")
                
            time.sleep(0.001)
            
    def _process_data_queue(self):
        """Process data from queue"""
        while True:
            try:
                # Get data from queue
                data_item = self.data_queue.get(timeout=1)
                
                timestamp = data_item['timestamp']
                raw_data = data_item['data']
                
                # Update total count
                current_total = self.total_messages.get()
                self.total_messages.set(current_total + 1)
                
                # Add to raw text display (GUI thread safe)
                self.root.after(0, self._add_to_raw_display, timestamp, raw_data)
                
                # Try to extract numeric value for graphing
                numeric_value = self._extract_numeric_value(raw_data)
                if numeric_value is not None:
                    self.time_series_data['timestamps'].append(timestamp)
                    self.time_series_data['values'].append(numeric_value)
                    
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Data processing error: {e}")
                
    def _add_to_raw_display(self, timestamp, data):
        """Add data to raw text display (GUI thread)"""
        self.raw_text.insert(tk.END, f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] {data}\n")
        
        # Auto-scroll to bottom
        self.raw_text.see(tk.END)
        
        # Limit text size (keep last 1000 lines)
        lines = int(self.raw_text.index('end-1c').split('.')[0])
        if lines > 1000:
            self.raw_text.delete(1.0, f"{lines-1000}.0")
            
    def _extract_numeric_value(self, data: str) -> float:
        """Extract numeric value from data string"""
        import re
        
        # Try different patterns
        patterns = [
            r'(\d+\.?\d*)',  # Any number
            r'value[:\s]*([0-9.-]+)',  # "value: 123.45"
            r'temp[:\s]*([0-9.-]+)',   # "temp: 25.5"
            r'([0-9.-]+)°?[CF]?',      # Temperature readings
        ]
        
        for pattern in patterns:
            match = re.search(pattern, data, re.IGNORECASE)
            if match:
                try:
                    return float(match.group(1))
                except ValueError:
                    continue
                    
        return None
        
    def _update_graph(self, frame):
        """Update real-time graph"""
        if not self.time_series_data['timestamps']:
            return
            
        # Clear and redraw
        self.ax.clear()
        
        # Convert timestamps to matplotlib format
        timestamps = list(self.time_series_data['timestamps'])
        values = list(self.time_series_data['values'])
        
        if timestamps and values:
            # Plot data
            self.ax.plot(timestamps, values, 'b-', linewidth=1, alpha=0.7)
            self.ax.scatter(timestamps[-10:], values[-10:], color='red', s=20, alpha=0.8)
            
            # Format x-axis
            self.ax.set_xlabel("Time")
            self.ax.set_ylabel("Value")
            self.ax.set_title(f"Real-time Data ({len(values)} points)")
            self.ax.grid(True, alpha=0.3)
            
            # Auto-scale
            self.ax.relim()
            self.ax.autoscale_view()
            
        # Redraw
        self.canvas.draw()
        
    def _update_statistics(self):
        """Update statistics display"""
        if not self.time_series_data['values']:
            self.root.after(5000, self._update_statistics)
            return
            
        values = list(self.time_series_data['values'])
        timestamps = list(self.time_series_data['timestamps'])
        
        # Calculate statistics
        stats_text = "Real-time Data Statistics\n"
        stats_text += "=" * 30 + "\n\n"
        
        stats_text += f"Data Points: {len(values)}\n"
        stats_text += f"Time Range: {timestamps[0].strftime('%H:%M:%S')} - {timestamps[-1].strftime('%H:%M:%S')}\n\n"
        
        if values:
            stats_text += f"Current Value: {values[-1]:.2f}\n"
            stats_text += f"Minimum: {min(values):.2f}\n"
            stats_text += f"Maximum: {max(values):.2f}\n"
            stats_text += f"Average: {sum(values)/len(values):.2f}\n"
            
            # Calculate standard deviation
            mean = sum(values) / len(values)
            variance = sum((x - mean) ** 2 for x in values) / len(values)
            std_dev = variance ** 0.5
            stats_text += f"Std Deviation: {std_dev:.2f}\n"
            
            # Recent trend (last 20 points)
            if len(values) >= 20:
                recent_values = values[-20:]
                trend = recent_values[-1] - recent_values[0]
                stats_text += f"Recent Trend: {'+' if trend > 0 else ''}{trend:.2f}\n"
                
        # Update display
        self.stats_text.delete(1.0, tk.END)
        self.stats_text.insert(tk.END, stats_text)
        
        # Schedule next update
        self.root.after(5000, self._update_statistics)
        
    def run(self):
        """Run the dashboard"""
        print(f"🎛️ Starting Real-time Data Dashboard")
        print(f"   Port: {self.port}")
        print(f"   Baudrate: {self.baudrate}")
        
        try:
            self.root.mainloop()
        except KeyboardInterrupt:
            pass
        finally:
            if self.serial:
                self.serial.close()

# Example usage
dashboard = RealTimeDataDashboard('/dev/ttyUSB0', 9600)
dashboard.run()

Data Analysis Tools

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from scipy import signal
from scipy.stats import pearsonr
import sqlite3

class SerialDataAnalyzer:
    def __init__(self, db_path: str = 'serial_data.db'):
        self.db_path = db_path
        
    def load_session_data(self, session_id: int) -> pd.DataFrame:
        """Load session data into pandas DataFrame"""
        conn = sqlite3.connect(self.db_path)
        
        query = '''
            SELECT p.timestamp, p.field_name, p.numeric_value, p.text_value, p.boolean_value
            FROM parsed_data p
            WHERE p.session_id = ?
            ORDER BY p.timestamp
        '''
        
        df = pd.read_sql_query(query, conn, params=(session_id,))
        conn.close()
        
        # Convert timestamp to datetime
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        # Combine value columns
        df['value'] = df['numeric_value'].fillna(
            df['text_value'].fillna(df['boolean_value'])
        )
        
        return df
        
    def pivot_sensor_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Pivot data to have sensors as columns"""
        pivot_df = df.pivot_table(
            index='timestamp',
            columns='field_name',
            values='value',
            aggfunc='first'
        )
        
        # Forward fill missing values
        pivot_df = pivot_df.fillna(method='ffill')
        
        return pivot_df
        
    def basic_statistics(self, df: pd.DataFrame) -> dict:
        """Calculate basic statistics for all numeric columns"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        stats = {}
        
        for col in numeric_cols:
            if col in df.columns:
                data = df[col].dropna()
                if len(data) > 0:
                    stats[col] = {
                        'count': len(data),
                        'mean': data.mean(),
                        'std': data.std(),
                        'min': data.min(),
                        'max': data.max(),
                        'median': data.median(),
                        'q25': data.quantile(0.25),
                        'q75': data.quantile(0.75)
                    }
                    
        return stats
        
    def detect_anomalies(self, df: pd.DataFrame, column: str, method: str = 'zscore', threshold: float = 3.0) -> pd.DataFrame:
        """Detect anomalies in data"""
        if column not in df.columns:
            return pd.DataFrame()
            
        data = df[column].dropna()
        
        if method == 'zscore':
            z_scores = np.abs((data - data.mean()) / data.std())
            anomalies = df[z_scores > threshold]
            
        elif method == 'iqr':
            Q1 = data.quantile(0.25)
            Q3 = data.quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            anomalies = df[(data < lower_bound) | (data > upper_bound)]
            
        elif method == 'isolation_forest':
            from sklearn.ensemble import IsolationForest
            
            iso_forest = IsolationForest(contamination=0.1, random_state=42)
            outlier_labels = iso_forest.fit_predict(data.values.reshape(-1, 1))
            anomalies = df[outlier_labels == -1]
            
        return anomalies
        
    def calculate_moving_averages(self, df: pd.DataFrame, windows: list = [5, 10, 20]) -> pd.DataFrame:
        """Calculate moving averages for numeric columns"""
        result_df = df.copy()
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        
        for col in numeric_cols:
            for window in windows:
                result_df[f'{col}_ma{window}'] = df[col].rolling(window=window, min_periods=1).mean()
                
        return result_df
        
    def correlation_analysis(self, df: pd.DataFrame) -> pd.DataFrame:
        """Calculate correlation matrix for numeric columns"""
        numeric_df = df.select_dtypes(include=[np.number])
        correlation_matrix = numeric_df.corr()
        
        return correlation_matrix
        
    def trend_analysis(self, df: pd.DataFrame, column: str) -> dict:
        """Analyze trend in time series data"""
        if column not in df.columns:
            return {}
            
        # Ensure data is sorted by timestamp
        df_sorted = df.sort_values('timestamp')
        data = df_sorted[column].dropna()
        
        if len(data) < 2:
            return {'error': 'Insufficient data points'}
            
        # Calculate trend using linear regression
        x = np.arange(len(data))
        coeffs = np.polyfit(x, data, 1)
        trend_slope = coeffs[0]
        
        # Calculate trend strength (R-squared)
        y_pred = np.polyval(coeffs, x)
        ss_res = np.sum((data - y_pred) ** 2)
        ss_tot = np.sum((data - np.mean(data)) ** 2)
        r_squared = 1 - (ss_res / ss_tot)
        
        # Determine trend direction
        if abs(trend_slope) < 0.001:
            trend_direction = 'stable'
        elif trend_slope > 0:
            trend_direction = 'increasing'
        else:
            trend_direction = 'decreasing'
            
        return {
            'slope': trend_slope,
            'r_squared': r_squared,
            'direction': trend_direction,
            'start_value': data.iloc[0],
            'end_value': data.iloc[-1],
            'total_change': data.iloc[-1] - data.iloc[0],
            'percent_change': ((data.iloc[-1] - data.iloc[0]) / data.iloc[0]) * 100 if data.iloc[0] != 0 else 0
        }
        
    def frequency_analysis(self, df: pd.DataFrame, column: str, sampling_rate: float = 1.0) -> dict:
        """Perform frequency domain analysis"""
        if column not in df.columns:
            return {}
            
        data = df[column].dropna()
        
        if len(data) < 4:
            return {'error': 'Insufficient data for frequency analysis'}
            
        # Remove DC component
        data_centered = data - data.mean()
        
        # Apply FFT
        fft_values = np.fft.fft(data_centered)
        frequencies = np.fft.fftfreq(len(data_centered), 1/sampling_rate)
        
        # Get magnitude spectrum (positive frequencies only)
        n = len(data_centered)
        magnitude_spectrum = np.abs(fft_values[:n//2])
        frequency_bins = frequencies[:n//2]
        
        # Find dominant frequency
        if len(magnitude_spectrum) > 1:
            dominant_freq_idx = np.argmax(magnitude_spectrum[1:]) + 1  # Skip DC component
            dominant_frequency = frequency_bins[dominant_freq_idx]
            dominant_magnitude = magnitude_spectrum[dominant_freq_idx]
        else:
            dominant_frequency = 0
            dominant_magnitude = 0
            
        return {
            'dominant_frequency': dominant_frequency,
            'dominant_magnitude': dominant_magnitude,
            'frequency_bins': frequency_bins,
            'magnitude_spectrum': magnitude_spectrum
        }
        
    def generate_report(self, session_id: int, output_file: str = None) -> str:
        """Generate comprehensive analysis report"""
        # Load data
        df = self.load_session_data(session_id)
        
        if df.empty:
            return "No data found for session"
            
        # Pivot data
        pivot_df = self.pivot_sensor_data(df)
        
        report = []
        report.append("Serial Data Analysis Report")
        report.append("=" * 50)
        report.append(f"Session ID: {session_id}")
        report.append(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"Data Points: {len(df)}")
        report.append(f"Time Range: {df['timestamp'].min()} to {df['timestamp'].max()}")
        report.append(f"Duration: {df['timestamp'].max() - df['timestamp'].min()}")
        report.append("")
        
        # Basic statistics
        stats = self.basic_statistics(pivot_df)
        if stats:
            report.append("Basic Statistics:")
            report.append("-" * 20)
            for field, field_stats in stats.items():
                report.append(f"\n{field}:")
                for stat_name, value in field_stats.items():
                    report.append(f"  {stat_name}: {value:.3f}")
                    
        # Correlation analysis
        if len(pivot_df.columns) > 1:
            corr_matrix = self.correlation_analysis(pivot_df)
            report.append(f"\n\nCorrelation Matrix:")
            report.append("-" * 20)
            report.append(str(corr_matrix.round(3)))
            
        # Trend analysis for each numeric field
        report.append(f"\n\nTrend Analysis:")
        report.append("-" * 20)
        
        for col in pivot_df.select_dtypes(include=[np.number]).columns:
            trend_info = self.trend_analysis(pivot_df.reset_index(), col)
            if 'error' not in trend_info:
                report.append(f"\n{col}:")
                report.append(f"  Direction: {trend_info['direction']}")
                report.append(f"  Slope: {trend_info['slope']:.6f}")
                report.append(f"  R-squared: {trend_info['r_squared']:.3f}")
                report.append(f"  Total Change: {trend_info['total_change']:.3f}")
                report.append(f"  Percent Change: {trend_info['percent_change']:.2f}%")
                
        # Anomaly detection
        report.append(f"\n\nAnomaly Detection:")
        report.append("-" * 20)
        
        for col in pivot_df.select_dtypes(include=[np.number]).columns:
            anomalies = self.detect_anomalies(pivot_df.reset_index(), col)
            report.append(f"\n{col}: {len(anomalies)} anomalies detected")
            
        report_text = "\n".join(report)
        
        # Save to file if requested
        if output_file:
            with open(output_file, 'w') as f:
                f.write(report_text)
            print(f"📄 Report saved to {output_file}")
            
        return report_text
        
    def create_visualizations(self, session_id: int, save_plots: bool = True):
        """Create comprehensive visualizations"""
        # Load and pivot data
        df = self.load_session_data(session_id)
        pivot_df = self.pivot_sensor_data(df)
        
        if pivot_df.empty:
            print("No data to visualize")
            return
            
        numeric_cols = pivot_df.select_dtypes(include=[np.number]).columns
        n_cols = len(numeric_cols)
        
        if n_cols == 0:
            print("No numeric data to visualize")
            return
            
        # Set up subplots
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle(f'Serial Data Analysis - Session {session_id}', fontsize=16)
        
        # Time series plot
        ax1 = axes[0, 0]
        for col in numeric_cols[:5]:  # Limit to first 5 columns
            ax1.plot(pivot_df.index, pivot_df[col], label=col, alpha=0.8)
        ax1.set_title('Time Series Data')
        ax1.set_xlabel('Time')
        ax1.set_ylabel('Values')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # Distribution plots
        ax2 = axes[0, 1]
        if n_cols >= 1:
            first_col = numeric_cols[0]
            ax2.hist(pivot_df[first_col].dropna(), bins=50, alpha=0.7, edgecolor='black')
            ax2.set_title(f'Distribution - {first_col}')
            ax2.set_xlabel('Value')
            ax2.set_ylabel('Frequency')
            ax2.grid(True, alpha=0.3)
            
        # Correlation heatmap
        ax3 = axes[1, 0]
        if n_cols > 1:
            corr_matrix = self.correlation_analysis(pivot_df)
            sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, ax=ax3)
            ax3.set_title('Correlation Heatmap')
        else:
            ax3.text(0.5, 0.5, 'Insufficient data\nfor correlation analysis', 
                    transform=ax3.transAxes, ha='center', va='center')
            ax3.set_title('Correlation Heatmap')
            
        # Box plot for outlier detection
        ax4 = axes[1, 1]
        if n_cols >= 1:
            box_data = [pivot_df[col].dropna() for col in numeric_cols[:5]]
            ax4.boxplot(box_data, labels=[col[:10] for col in numeric_cols[:5]])
            ax4.set_title('Box Plot (Outlier Detection)')
            ax4.set_ylabel('Values')
            ax4.tick_params(axis='x', rotation=45)
            
        plt.tight_layout()
        
        if save_plots:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f'serial_analysis_session_{session_id}_{timestamp}.png'
            plt.savefig(filename, dpi=300, bbox_inches='tight')
            print(f"📊 Visualizations saved to {filename}")
            
        plt.show()

# Example usage
analyzer = SerialDataAnalyzer('sensor_data.db')

# Generate report for session 1
report = analyzer.generate_report(1, 'analysis_report.txt')
print(report[:500] + "..." if len(report) > 500 else report)

# Create visualizations
analyzer.create_visualizations(1, save_plots=True)

Comprehensive data logging enables powerful analysis of serial data streams. From simple file logging to real-time dashboards, these tools provide everything needed for scientific and industrial data collection.

How is this guide?