Data Logging & Analysis
Complete guide to serial data logging with PySerial - capture sensor data, store in databases, and build real-time analysis systems
Build comprehensive data logging systems with PySerial - capture sensor data in real-time, store to databases, and perform analysis for scientific and industrial applications.
Basic Data Logger
Start with a simple file-based logger that can capture and timestamp all serial data with configurable output formats.
import serial
import threading
import time
from datetime import datetime
from pathlib import Path
import os
class SerialDataLogger:
def __init__(self, port: str, baudrate: int = 9600, log_dir: str = "serial_logs"):
self.port = port
self.baudrate = baudrate
self.log_dir = Path(log_dir)
self.serial = None
self.logging = False
# Create log directory
self.log_dir.mkdir(exist_ok=True)
# Generate log filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.log_file = self.log_dir / f"serial_log_{timestamp}.txt"
# Statistics
self.stats = {
'lines_logged': 0,
'bytes_logged': 0,
'start_time': None,
'last_entry_time': None
}
def connect(self) -> bool:
"""Connect to serial port"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1
)
print(f"✅ Connected to {self.port}")
print(f"📁 Logging to: {self.log_file}")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False
def start_logging(self):
"""Start data logging"""
if not self.serial:
print("❌ Not connected to serial port")
return False
if self.logging:
print("⚠️ Already logging")
return True
self.logging = True
self.stats['start_time'] = datetime.now()
# Start logging thread
self.log_thread = threading.Thread(target=self._logging_worker)
self.log_thread.daemon = True
self.log_thread.start()
print("🔄 Data logging started")
return True
def _logging_worker(self):
"""Background logging worker"""
with open(self.log_file, 'w', encoding='utf-8') as f:
# Write header
f.write(f"# Serial Data Log\n")
f.write(f"# Port: {self.port}\n")
f.write(f"# Baudrate: {self.baudrate}\n")
f.write(f"# Started: {self.stats['start_time']}\n")
f.write(f"# Format: [TIMESTAMP] DATA\n")
f.write("#" + "="*50 + "\n\n")
f.flush()
while self.logging:
try:
if self.serial.in_waiting:
# Read available data
data = self.serial.read(self.serial.in_waiting)
# Process line by line
lines = data.decode('utf-8', errors='ignore').splitlines()
for line in lines:
if line.strip(): # Skip empty lines
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
log_entry = f"[{timestamp}] {line}\n"
f.write(log_entry)
f.flush() # Ensure data is written immediately
# Update statistics
self.stats['lines_logged'] += 1
self.stats['bytes_logged'] += len(log_entry)
self.stats['last_entry_time'] = datetime.now()
print(f"📝 {line}") # Optional: print to console
except Exception as e:
print(f"Logging error: {e}")
time.sleep(0.01) # Small delay to prevent CPU spinning
def stop_logging(self):
"""Stop data logging"""
if not self.logging:
return
self.logging = False
if hasattr(self, 'log_thread'):
self.log_thread.join()
# Write summary
self._write_summary()
print("⏹️ Data logging stopped")
print(f"📊 Summary: {self.stats['lines_logged']} lines, {self.stats['bytes_logged']} bytes")
def _write_summary(self):
"""Write logging summary to file"""
try:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(f"\n# Logging Summary\n")
f.write(f"# Stopped: {datetime.now()}\n")
f.write(f"# Lines logged: {self.stats['lines_logged']}\n")
f.write(f"# Bytes logged: {self.stats['bytes_logged']}\n")
if self.stats['start_time'] and self.stats['last_entry_time']:
duration = self.stats['last_entry_time'] - self.stats['start_time']
f.write(f"# Duration: {duration}\n")
except Exception as e:
print(f"Error writing summary: {e}")
def get_statistics(self):
"""Get logging statistics"""
stats = self.stats.copy()
if stats['start_time']:
if self.logging and stats['last_entry_time']:
stats['duration'] = stats['last_entry_time'] - stats['start_time']
elif not self.logging:
stats['duration'] = datetime.now() - stats['start_time']
if stats.get('duration'):
stats['lines_per_second'] = stats['lines_logged'] / stats['duration'].total_seconds()
stats['bytes_per_second'] = stats['bytes_logged'] / stats['duration'].total_seconds()
return stats
def close(self):
"""Close logger and serial connection"""
self.stop_logging()
if self.serial:
self.serial.close()
# Example usage
logger = SerialDataLogger('/dev/ttyUSB0', 115200)
if logger.connect():
logger.start_logging()
try:
# Log for 30 seconds
time.sleep(30)
# Show statistics
stats = logger.get_statistics()
print(f"\n📈 Logging Statistics:")
print(f" Lines: {stats['lines_logged']}")
print(f" Bytes: {stats['bytes_logged']}")
if 'lines_per_second' in stats:
print(f" Rate: {stats['lines_per_second']:.1f} lines/sec")
except KeyboardInterrupt:
print("\nStopping logger...")
finally:
logger.close()
import csv
import serial
import threading
import time
from datetime import datetime
from typing import List, Dict, Callable, Optional
import re
class SerialCSVLogger:
def __init__(self, port: str, baudrate: int = 9600, csv_file: str = None):
self.port = port
self.baudrate = baudrate
self.serial = None
self.logging = False
# Generate CSV filename if not provided
if csv_file is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.csv_file = f"serial_data_{timestamp}.csv"
else:
self.csv_file = csv_file
# Data parsing configuration
self.columns = ['timestamp', 'raw_data']
self.data_parsers = []
self.field_separator = ','
self.decimal_separator = '.'
# Statistics
self.rows_written = 0
def add_data_parser(self, name: str, parser_func: Callable[[str], Dict]):
"""Add custom data parser"""
self.data_parsers.append({
'name': name,
'parser': parser_func
})
def set_csv_columns(self, columns: List[str]):
"""Set CSV column headers"""
self.columns = ['timestamp'] + columns
def connect(self) -> bool:
"""Connect to serial port"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1
)
print(f"✅ Connected to {self.port}")
print(f"📊 CSV file: {self.csv_file}")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False
def start_logging(self):
"""Start CSV logging"""
if not self.serial:
return False
self.logging = True
# Initialize CSV file with headers
with open(self.csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(self.columns)
# Start logging thread
self.log_thread = threading.Thread(target=self._csv_logging_worker)
self.log_thread.daemon = True
self.log_thread.start()
print("📈 CSV logging started")
return True
def _csv_logging_worker(self):
"""CSV logging worker"""
with open(self.csv_file, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
while self.logging:
try:
if self.serial.in_waiting:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line:
# Parse data
parsed_data = self._parse_data_line(line)
if parsed_data:
# Write to CSV
row = [datetime.now().isoformat()] + [
parsed_data.get(col, '') for col in self.columns[1:]
]
writer.writerow(row)
f.flush()
self.rows_written += 1
if self.rows_written % 100 == 0:
print(f"📊 {self.rows_written} rows logged")
except Exception as e:
print(f"CSV logging error: {e}")
time.sleep(0.01)
def _parse_data_line(self, line: str) -> Optional[Dict]:
"""Parse data line using configured parsers"""
# Always include raw data
parsed_data = {'raw_data': line}
# Apply custom parsers
for parser_info in self.data_parsers:
try:
result = parser_info['parser'](line)
if isinstance(result, dict):
parsed_data.update(result)
except Exception as e:
print(f"Parser '{parser_info['name']}' error: {e}")
# Auto-parse numeric fields if no custom parsers
if len(self.data_parsers) == 0:
parsed_data.update(self._auto_parse_numeric(line))
return parsed_data
def _auto_parse_numeric(self, line: str) -> Dict:
"""Automatically parse numeric values from line"""
data = {}
# Try comma-separated values
if self.field_separator in line:
parts = line.split(self.field_separator)
for i, part in enumerate(parts):
try:
# Try to convert to float
value = float(part.strip().replace(self.decimal_separator, '.'))
data[f'field_{i+1}'] = value
except ValueError:
data[f'field_{i+1}'] = part.strip()
# Try to extract all numbers
numbers = re.findall(r'-?\d+\.?\d*', line)
for i, num in enumerate(numbers):
try:
data[f'number_{i+1}'] = float(num)
except ValueError:
pass
return data
def stop_logging(self):
"""Stop CSV logging"""
self.logging = False
if hasattr(self, 'log_thread'):
self.log_thread.join()
print(f"📈 CSV logging stopped. {self.rows_written} rows written.")
def close(self):
"""Close logger"""
self.stop_logging()
if self.serial:
self.serial.close()
# Example parsers for common data formats
def temperature_humidity_parser(line: str) -> Dict:
"""Parse temperature and humidity data"""
# Example: "Temperature: 25.5°C, Humidity: 60.2%"
data = {}
temp_match = re.search(r'temperature[:\s]+([0-9.-]+)', line, re.IGNORECASE)
if temp_match:
data['temperature'] = float(temp_match.group(1))
humid_match = re.search(r'humidity[:\s]+([0-9.-]+)', line, re.IGNORECASE)
if humid_match:
data['humidity'] = float(humid_match.group(1))
return data
def nmea_gps_parser(line: str) -> Dict:
"""Parse NMEA GPS sentences"""
data = {}
if line.startswith('$GPGGA'):
parts = line.split(',')
if len(parts) > 9:
try:
# Extract latitude, longitude, altitude
if parts[2] and parts[3]: # Latitude
lat = float(parts[2][:2]) + float(parts[2][2:]) / 60
if parts[3] == 'S':
lat = -lat
data['latitude'] = lat
if parts[4] and parts[5]: # Longitude
lon = float(parts[4][:3]) + float(parts[4][3:]) / 60
if parts[5] == 'W':
lon = -lon
data['longitude'] = lon
if parts[9]: # Altitude
data['altitude'] = float(parts[9])
if parts[7]: # Number of satellites
data['satellites'] = int(parts[7])
except (ValueError, IndexError):
pass
return data
def sensor_array_parser(line: str) -> Dict:
"""Parse sensor array data format"""
# Example: "S1:25.5,S2:60.2,S3:1013.2,S4:15.8"
data = {}
sensor_matches = re.findall(r'S(\d+):([0-9.-]+)', line)
for sensor_num, value in sensor_matches:
data[f'sensor_{sensor_num}'] = float(value)
return data
# Example usage with custom parsers
csv_logger = SerialCSVLogger('/dev/ttyUSB0', 9600, 'sensor_data.csv')
# Configure columns and parser
csv_logger.set_csv_columns(['raw_data', 'temperature', 'humidity', 'pressure'])
csv_logger.add_data_parser('temp_humidity', temperature_humidity_parser)
if csv_logger.connect():
csv_logger.start_logging()
try:
time.sleep(60) # Log for 1 minute
except KeyboardInterrupt:
pass
finally:
csv_logger.close()
import json
import serial
import threading
import time
from datetime import datetime
from typing import Dict, List, Any
import uuid
class SerialJSONLogger:
def __init__(self, port: str, baudrate: int = 9600, json_file: str = None):
self.port = port
self.baudrate = baudrate
self.serial = None
self.logging = False
# Generate JSON filename if not provided
if json_file is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.json_file = f"serial_data_{timestamp}.json"
else:
self.json_file = json_file
# Configuration
self.metadata = {
'session_id': str(uuid.uuid4()),
'port': port,
'baudrate': baudrate,
'start_time': None,
'end_time': None
}
self.data_entries = []
self.max_entries_in_memory = 1000
self.auto_flush_interval = 10 # seconds
def connect(self) -> bool:
"""Connect to serial port"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1
)
print(f"✅ Connected to {self.port}")
print(f"📄 JSON file: {self.json_file}")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False
def start_logging(self):
"""Start JSON logging"""
if not self.serial:
return False
self.logging = True
self.metadata['start_time'] = datetime.now().isoformat()
# Initialize JSON file
self._write_initial_json()
# Start logging thread
self.log_thread = threading.Thread(target=self._json_logging_worker)
self.log_thread.daemon = True
self.log_thread.start()
# Start auto-flush thread
self.flush_thread = threading.Thread(target=self._auto_flush_worker)
self.flush_thread.daemon = True
self.flush_thread.start()
print("🗂️ JSON logging started")
return True
def _write_initial_json(self):
"""Write initial JSON structure"""
initial_data = {
'metadata': self.metadata,
'data': []
}
with open(self.json_file, 'w', encoding='utf-8') as f:
json.dump(initial_data, f, indent=2)
def _json_logging_worker(self):
"""JSON logging worker"""
while self.logging:
try:
if self.serial.in_waiting:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line:
# Create data entry
entry = {
'timestamp': datetime.now().isoformat(),
'raw_data': line,
'parsed_data': self._parse_structured_data(line),
'metadata': {
'length': len(line),
'encoding': 'utf-8'
}
}
self.data_entries.append(entry)
# Flush if memory limit reached
if len(self.data_entries) >= self.max_entries_in_memory:
self._flush_to_file()
except Exception as e:
print(f"JSON logging error: {e}")
time.sleep(0.01)
def _auto_flush_worker(self):
"""Automatic flush worker"""
while self.logging:
time.sleep(self.auto_flush_interval)
if self.data_entries:
self._flush_to_file()
def _flush_to_file(self):
"""Flush data entries to JSON file"""
if not self.data_entries:
return
try:
# Read existing data
with open(self.json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Add new entries
json_data['data'].extend(self.data_entries)
json_data['metadata'] = self.metadata
# Write back to file
with open(self.json_file, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=2)
print(f"💾 Flushed {len(self.data_entries)} entries to JSON file")
self.data_entries.clear()
except Exception as e:
print(f"Error flushing to JSON file: {e}")
def _parse_structured_data(self, line: str) -> Dict[str, Any]:
"""Parse line into structured data"""
parsed = {}
# Try to parse as JSON first
try:
parsed = json.loads(line)
parsed['format'] = 'json'
return parsed
except json.JSONDecodeError:
pass
# Try key-value pairs
if '=' in line and ',' in line:
pairs = line.split(',')
for pair in pairs:
if '=' in pair:
key, value = pair.split('=', 1)
key = key.strip()
value = value.strip()
# Try to convert to number
try:
if '.' in value:
parsed[key] = float(value)
else:
parsed[key] = int(value)
except ValueError:
parsed[key] = value
if parsed:
parsed['format'] = 'key_value'
return parsed
# Try comma-separated values
if ',' in line:
values = [v.strip() for v in line.split(',')]
for i, value in enumerate(values):
try:
if '.' in value:
parsed[f'value_{i}'] = float(value)
else:
parsed[f'value_{i}'] = int(value)
except ValueError:
parsed[f'value_{i}'] = value
if parsed:
parsed['format'] = 'csv'
return parsed
# Default: treat as single value
try:
if '.' in line:
parsed['value'] = float(line)
else:
parsed['value'] = int(line)
except ValueError:
parsed['value'] = line
parsed['format'] = 'single_value'
return parsed
def add_custom_metadata(self, key: str, value: Any):
"""Add custom metadata"""
self.metadata[key] = value
def stop_logging(self):
"""Stop JSON logging"""
self.logging = False
if hasattr(self, 'log_thread'):
self.log_thread.join()
if hasattr(self, 'flush_thread'):
self.flush_thread.join()
# Final flush
if self.data_entries:
self._flush_to_file()
# Update metadata
self.metadata['end_time'] = datetime.now().isoformat()
self._update_metadata()
print("🗂️ JSON logging stopped")
def _update_metadata(self):
"""Update metadata in JSON file"""
try:
with open(self.json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
json_data['metadata'] = self.metadata
with open(self.json_file, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=2)
except Exception as e:
print(f"Error updating metadata: {e}")
def get_data_summary(self) -> Dict:
"""Get summary of logged data"""
try:
with open(self.json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
total_entries = len(json_data['data'])
formats = {}
for entry in json_data['data']:
format_type = entry.get('parsed_data', {}).get('format', 'unknown')
formats[format_type] = formats.get(format_type, 0) + 1
return {
'total_entries': total_entries,
'formats': formats,
'file_size': os.path.getsize(self.json_file) if os.path.exists(self.json_file) else 0,
'metadata': json_data['metadata']
}
except Exception as e:
return {'error': str(e)}
def close(self):
"""Close logger"""
self.stop_logging()
if self.serial:
self.serial.close()
# Example usage
json_logger = SerialJSONLogger('/dev/ttyUSB0', 9600, 'sensor_data.json')
# Add custom metadata
json_logger.add_custom_metadata('experiment', 'Temperature Monitoring')
json_logger.add_custom_metadata('location', 'Lab Room 101')
json_logger.add_custom_metadata('operator', 'John Doe')
if json_logger.connect():
json_logger.start_logging()
try:
time.sleep(60) # Log for 1 minute
# Get summary
summary = json_logger.get_data_summary()
print(f"📊 Data Summary: {summary}")
except KeyboardInterrupt:
pass
finally:
json_logger.close()
Database Integration
Store serial data in databases for efficient querying, analysis, and long-term storage with proper indexing and data types.
import serial
import sqlite3
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import statistics
import json
class SerialDatabaseLogger:
def __init__(self, port: str, baudrate: int = 9600, db_path: str = 'serial_data.db'):
self.port = port
self.baudrate = baudrate
self.db_path = db_path
self.serial = None
self.logging = False
# Initialize database
self._init_database()
# Session management
self.session_id = None
def _init_database(self):
"""Initialize SQLite database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id INTEGER PRIMARY KEY AUTOINCREMENT,
port TEXT NOT NULL,
baudrate INTEGER NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME,
description TEXT,
metadata TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS raw_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER REFERENCES sessions(session_id),
timestamp DATETIME NOT NULL,
raw_data TEXT NOT NULL,
data_length INTEGER,
checksum TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS parsed_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER REFERENCES sessions(session_id),
timestamp DATETIME NOT NULL,
data_type TEXT NOT NULL,
field_name TEXT NOT NULL,
numeric_value REAL,
text_value TEXT,
boolean_value BOOLEAN
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER REFERENCES sessions(session_id),
timestamp DATETIME NOT NULL,
event_type TEXT NOT NULL,
description TEXT,
severity INTEGER DEFAULT 1
)
''')
# Create indexes for performance
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_raw_data_timestamp
ON raw_data(timestamp)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_parsed_data_timestamp
ON parsed_data(timestamp)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_parsed_data_field
ON parsed_data(field_name, timestamp)
''')
conn.commit()
conn.close()
def connect(self, description: str = "") -> bool:
"""Connect to serial port and start new session"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=1
)
# Create new session
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO sessions (port, baudrate, start_time, description)
VALUES (?, ?, ?, ?)
''', (self.port, self.baudrate, datetime.now(), description))
self.session_id = cursor.lastrowid
conn.commit()
conn.close()
print(f"✅ Connected to {self.port}")
print(f"🗄️ Database session {self.session_id} started")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False
def start_logging(self):
"""Start database logging"""
if not self.serial or not self.session_id:
return False
self.logging = True
# Start logging thread
self.log_thread = threading.Thread(target=self._database_logging_worker)
self.log_thread.daemon = True
self.log_thread.start()
print("🗄️ Database logging started")
return True
def _database_logging_worker(self):
"""Database logging worker"""
batch_size = 100
batch_raw_data = []
batch_parsed_data = []
while self.logging:
try:
if self.serial.in_waiting:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line:
timestamp = datetime.now()
# Prepare raw data entry
raw_entry = (
self.session_id,
timestamp,
line,
len(line),
self._calculate_checksum(line)
)
batch_raw_data.append(raw_entry)
# Parse and prepare structured data
parsed_fields = self._parse_line_to_fields(line, timestamp)
batch_parsed_data.extend(parsed_fields)
# Batch insert when limit reached
if len(batch_raw_data) >= batch_size:
self._batch_insert(batch_raw_data, batch_parsed_data)
batch_raw_data.clear()
batch_parsed_data.clear()
except Exception as e:
self._log_event('error', f'Logging error: {e}', severity=3)
time.sleep(0.01)
# Final batch insert
if batch_raw_data or batch_parsed_data:
self._batch_insert(batch_raw_data, batch_parsed_data)
def _batch_insert(self, raw_data: List, parsed_data: List):
"""Batch insert data to database"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Insert raw data
if raw_data:
cursor.executemany('''
INSERT INTO raw_data (session_id, timestamp, raw_data, data_length, checksum)
VALUES (?, ?, ?, ?, ?)
''', raw_data)
# Insert parsed data
if parsed_data:
cursor.executemany('''
INSERT INTO parsed_data (session_id, timestamp, data_type, field_name,
numeric_value, text_value, boolean_value)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', parsed_data)
conn.commit()
conn.close()
print(f"💾 Inserted {len(raw_data)} raw, {len(parsed_data)} parsed records")
except Exception as e:
self._log_event('error', f'Database insert error: {e}', severity=3)
def _parse_line_to_fields(self, line: str, timestamp: datetime) -> List:
"""Parse line into database fields"""
parsed_entries = []
# Try JSON format
try:
data = json.loads(line)
for key, value in data.items():
parsed_entries.append(self._create_parsed_entry(
timestamp, 'json', key, value
))
return parsed_entries
except json.JSONDecodeError:
pass
# Try key=value format
if '=' in line:
pairs = line.replace(',', ' ').split()
for pair in pairs:
if '=' in pair:
key, value = pair.split('=', 1)
parsed_entries.append(self._create_parsed_entry(
timestamp, 'keyvalue', key.strip(), value.strip()
))
if parsed_entries:
return parsed_entries
# Try comma-separated values
if ',' in line:
values = [v.strip() for v in line.split(',')]
for i, value in enumerate(values):
parsed_entries.append(self._create_parsed_entry(
timestamp, 'csv', f'field_{i}', value
))
if parsed_entries:
return parsed_entries
# Single value
parsed_entries.append(self._create_parsed_entry(
timestamp, 'single', 'value', line
))
return parsed_entries
def _create_parsed_entry(self, timestamp: datetime, data_type: str,
field_name: str, value: Any) -> tuple:
"""Create parsed data entry tuple"""
numeric_value = None
text_value = None
boolean_value = None
# Determine value type and store appropriately
if isinstance(value, bool):
boolean_value = value
elif isinstance(value, (int, float)):
numeric_value = float(value)
else:
# Try to convert string to number
try:
numeric_value = float(value)
except (ValueError, TypeError):
# Try boolean
if str(value).lower() in ('true', 'false', '1', '0', 'on', 'off'):
boolean_value = str(value).lower() in ('true', '1', 'on')
else:
text_value = str(value)
return (
self.session_id,
timestamp,
data_type,
field_name,
numeric_value,
text_value,
boolean_value
)
def _calculate_checksum(self, data: str) -> str:
"""Calculate simple checksum for data integrity"""
return hex(sum(ord(c) for c in data) & 0xFFFF)
def _log_event(self, event_type: str, description: str, severity: int = 1):
"""Log system event"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO events (session_id, timestamp, event_type, description, severity)
VALUES (?, ?, ?, ?, ?)
''', (self.session_id, datetime.now(), event_type, description, severity))
conn.commit()
conn.close()
except Exception as e:
print(f"Error logging event: {e}")
def query_data(self, field_name: str, hours: int = 24) -> List[Dict]:
"""Query parsed data for specific field"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, numeric_value, text_value, boolean_value
FROM parsed_data
WHERE session_id = ? AND field_name = ?
AND timestamp >= datetime('now', '-{} hours')
ORDER BY timestamp
'''.format(hours), (self.session_id, field_name))
results = []
for timestamp, num_val, text_val, bool_val in cursor.fetchall():
value = num_val if num_val is not None else text_val if text_val is not None else bool_val
results.append({
'timestamp': timestamp,
'value': value
})
conn.close()
return results
def get_statistics(self, field_name: str, hours: int = 24) -> Dict:
"""Get statistical analysis for numeric field"""
data = self.query_data(field_name, hours)
numeric_data = [item['value'] for item in data if isinstance(item['value'], (int, float))]
if not numeric_data:
return {'error': 'No numeric data found'}
stats = {
'count': len(numeric_data),
'min': min(numeric_data),
'max': max(numeric_data),
'mean': statistics.mean(numeric_data),
'median': statistics.median(numeric_data),
'std_dev': statistics.stdev(numeric_data) if len(numeric_data) > 1 else 0,
'first_timestamp': data[0]['timestamp'] if data else None,
'last_timestamp': data[-1]['timestamp'] if data else None
}
return stats
def export_session_data(self, filename: str, format: str = 'csv'):
"""Export session data to file"""
conn = sqlite3.connect(self.db_path)
if format.lower() == 'csv':
import csv
cursor = conn.cursor()
cursor.execute('''
SELECT r.timestamp, r.raw_data, p.field_name, p.numeric_value, p.text_value, p.boolean_value
FROM raw_data r
LEFT JOIN parsed_data p ON r.timestamp = p.timestamp
WHERE r.session_id = ?
ORDER BY r.timestamp
''', (self.session_id,))
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Timestamp', 'Raw Data', 'Field Name', 'Numeric Value', 'Text Value', 'Boolean Value'])
writer.writerows(cursor.fetchall())
elif format.lower() == 'json':
cursor = conn.cursor()
cursor.execute('''
SELECT timestamp, raw_data FROM raw_data WHERE session_id = ? ORDER BY timestamp
''', (self.session_id,))
data = [{'timestamp': ts, 'raw_data': raw} for ts, raw in cursor.fetchall()]
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, default=str)
conn.close()
print(f"📤 Session data exported to {filename}")
def stop_logging(self):
"""Stop database logging"""
self.logging = False
if hasattr(self, 'log_thread'):
self.log_thread.join()
# Update session end time
if self.session_id:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE sessions SET end_time = ? WHERE session_id = ?
''', (datetime.now(), self.session_id))
conn.commit()
conn.close()
self._log_event('info', 'Logging session ended')
print("🗄️ Database logging stopped")
def close(self):
"""Close logger and connection"""
self.stop_logging()
if self.serial:
self.serial.close()
# Example usage
db_logger = SerialDatabaseLogger('/dev/ttyUSB0', 9600, 'sensor_data.db')
if db_logger.connect("Temperature and Humidity Monitoring Experiment"):
db_logger.start_logging()
try:
time.sleep(120) # Log for 2 minutes
# Query data
temp_data = db_logger.query_data('temperature', hours=1)
print(f"📊 Temperature readings: {len(temp_data)}")
# Get statistics
if temp_data:
stats = db_logger.get_statistics('temperature', hours=1)
print(f"📈 Temperature stats: Min={stats['min']:.1f}, Max={stats['max']:.1f}, Mean={stats['mean']:.1f}")
# Export data
db_logger.export_session_data('session_data.csv', 'csv')
except KeyboardInterrupt:
pass
finally:
db_logger.close()
Real-time Analysis and Visualization
Live Data Dashboard
import tkinter as tk
from tkinter import ttk
import threading
import time
from datetime import datetime, timedelta
import queue
from collections import deque
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.animation import FuncAnimation
import numpy as np
class RealTimeDataDashboard:
def __init__(self, serial_port: str, baudrate: int = 9600):
self.port = serial_port
self.baudrate = baudrate
self.serial = None
# Data storage
self.data_queue = queue.Queue()
self.time_series_data = {
'timestamps': deque(maxlen=1000),
'values': deque(maxlen=1000)
}
# GUI setup
self.root = tk.Tk()
self.root.title("Real-time Serial Data Dashboard")
self.root.geometry("1200x800")
# Status variables
self.connected = tk.BooleanVar(value=False)
self.logging = tk.BooleanVar(value=False)
self.total_messages = tk.IntVar(value=0)
self.messages_per_second = tk.DoubleVar(value=0.0)
# Create interface
self._create_interface()
# Data processing
self.data_processor = threading.Thread(target=self._process_data_queue)
self.data_processor.daemon = True
self.data_processor.start()
def _create_interface(self):
"""Create dashboard interface"""
# Control panel
control_frame = ttk.LabelFrame(self.root, text="Control Panel")
control_frame.pack(fill='x', padx=5, pady=5)
# Connection controls
conn_frame = ttk.Frame(control_frame)
conn_frame.pack(fill='x', padx=5, pady=5)
ttk.Button(conn_frame, text="Connect", command=self._connect).pack(side='left', padx=5)
ttk.Button(conn_frame, text="Disconnect", command=self._disconnect).pack(side='left', padx=5)
ttk.Button(conn_frame, text="Start Logging", command=self._start_logging).pack(side='left', padx=5)
ttk.Button(conn_frame, text="Stop Logging", command=self._stop_logging).pack(side='left', padx=5)
ttk.Button(conn_frame, text="Clear Data", command=self._clear_data).pack(side='left', padx=5)
# Status display
status_frame = ttk.LabelFrame(self.root, text="Status")
status_frame.pack(fill='x', padx=5, pady=5)
status_grid = ttk.Frame(status_frame)
status_grid.pack(fill='x', padx=5, pady=5)
ttk.Label(status_grid, text="Connected:").grid(row=0, column=0, sticky='w')
ttk.Label(status_grid, textvariable=self.connected).grid(row=0, column=1, sticky='w')
ttk.Label(status_grid, text="Logging:").grid(row=0, column=2, sticky='w', padx=(20,0))
ttk.Label(status_grid, textvariable=self.logging).grid(row=0, column=3, sticky='w')
ttk.Label(status_grid, text="Total Messages:").grid(row=1, column=0, sticky='w')
ttk.Label(status_grid, textvariable=self.total_messages).grid(row=1, column=1, sticky='w')
ttk.Label(status_grid, text="Messages/sec:").grid(row=1, column=2, sticky='w', padx=(20,0))
ttk.Label(status_grid, textvariable=self.messages_per_second).grid(row=1, column=3, sticky='w')
# Data display
data_notebook = ttk.Notebook(self.root)
data_notebook.pack(fill='both', expand=True, padx=5, pady=5)
# Raw data tab
raw_frame = ttk.Frame(data_notebook)
data_notebook.add(raw_frame, text="Raw Data")
self.raw_text = tk.Text(raw_frame, height=15, wrap=tk.WORD)
raw_scrollbar = ttk.Scrollbar(raw_frame, orient="vertical", command=self.raw_text.yview)
self.raw_text.configure(yscrollcommand=raw_scrollbar.set)
self.raw_text.pack(side="left", fill="both", expand=True)
raw_scrollbar.pack(side="right", fill="y")
# Graph tab
graph_frame = ttk.Frame(data_notebook)
data_notebook.add(graph_frame, text="Real-time Graph")
# Create matplotlib figure
self.fig, self.ax = plt.subplots(figsize=(10, 6))
self.ax.set_title("Real-time Data")
self.ax.set_xlabel("Time")
self.ax.set_ylabel("Value")
self.ax.grid(True)
self.canvas = FigureCanvasTkAgg(self.fig, graph_frame)
self.canvas.get_tk_widget().pack(fill='both', expand=True)
# Start animation
self.animation = FuncAnimation(
self.fig, self._update_graph, interval=100, blit=False
)
# Statistics tab
stats_frame = ttk.Frame(data_notebook)
data_notebook.add(stats_frame, text="Statistics")
self.stats_text = tk.Text(stats_frame, height=15, wrap=tk.WORD)
stats_scrollbar = ttk.Scrollbar(stats_frame, orient="vertical", command=self.stats_text.yview)
self.stats_text.configure(yscrollcommand=stats_scrollbar.set)
self.stats_text.pack(side="left", fill="both", expand=True)
stats_scrollbar.pack(side="right", fill="y")
# Update statistics every 5 seconds
self.root.after(5000, self._update_statistics)
def _connect(self):
"""Connect to serial port"""
try:
import serial
self.serial = serial.Serial(self.port, self.baudrate, timeout=1)
self.connected.set(True)
print(f"✅ Connected to {self.port}")
except Exception as e:
print(f"❌ Connection failed: {e}")
def _disconnect(self):
"""Disconnect from serial port"""
if self.serial:
self.serial.close()
self.serial = None
self.connected.set(False)
self.logging.set(False)
print("🔌 Disconnected")
def _start_logging(self):
"""Start data logging"""
if not self.serial:
print("❌ Not connected")
return
self.logging.set(True)
self.log_thread = threading.Thread(target=self._logging_worker)
self.log_thread.daemon = True
self.log_thread.start()
print("🔄 Logging started")
def _stop_logging(self):
"""Stop data logging"""
self.logging.set(False)
print("⏹️ Logging stopped")
def _clear_data(self):
"""Clear all data"""
self.time_series_data['timestamps'].clear()
self.time_series_data['values'].clear()
self.total_messages.set(0)
# Clear text widgets
self.raw_text.delete(1.0, tk.END)
self.stats_text.delete(1.0, tk.END)
print("🗑️ Data cleared")
def _logging_worker(self):
"""Background logging worker"""
message_count = 0
start_time = time.time()
while self.logging.get() and self.serial:
try:
if self.serial.in_waiting:
line = self.serial.readline().decode('utf-8', errors='ignore').strip()
if line:
# Add to queue for processing
self.data_queue.put({
'timestamp': datetime.now(),
'data': line
})
message_count += 1
# Update rate calculation
elapsed = time.time() - start_time
if elapsed >= 1.0:
rate = message_count / elapsed
self.messages_per_second.set(round(rate, 2))
message_count = 0
start_time = time.time()
except Exception as e:
print(f"Logging error: {e}")
time.sleep(0.001)
def _process_data_queue(self):
"""Process data from queue"""
while True:
try:
# Get data from queue
data_item = self.data_queue.get(timeout=1)
timestamp = data_item['timestamp']
raw_data = data_item['data']
# Update total count
current_total = self.total_messages.get()
self.total_messages.set(current_total + 1)
# Add to raw text display (GUI thread safe)
self.root.after(0, self._add_to_raw_display, timestamp, raw_data)
# Try to extract numeric value for graphing
numeric_value = self._extract_numeric_value(raw_data)
if numeric_value is not None:
self.time_series_data['timestamps'].append(timestamp)
self.time_series_data['values'].append(numeric_value)
except queue.Empty:
continue
except Exception as e:
print(f"Data processing error: {e}")
def _add_to_raw_display(self, timestamp, data):
"""Add data to raw text display (GUI thread)"""
self.raw_text.insert(tk.END, f"[{timestamp.strftime('%H:%M:%S.%f')[:-3]}] {data}\n")
# Auto-scroll to bottom
self.raw_text.see(tk.END)
# Limit text size (keep last 1000 lines)
lines = int(self.raw_text.index('end-1c').split('.')[0])
if lines > 1000:
self.raw_text.delete(1.0, f"{lines-1000}.0")
def _extract_numeric_value(self, data: str) -> float:
"""Extract numeric value from data string"""
import re
# Try different patterns
patterns = [
r'(\d+\.?\d*)', # Any number
r'value[:\s]*([0-9.-]+)', # "value: 123.45"
r'temp[:\s]*([0-9.-]+)', # "temp: 25.5"
r'([0-9.-]+)°?[CF]?', # Temperature readings
]
for pattern in patterns:
match = re.search(pattern, data, re.IGNORECASE)
if match:
try:
return float(match.group(1))
except ValueError:
continue
return None
def _update_graph(self, frame):
"""Update real-time graph"""
if not self.time_series_data['timestamps']:
return
# Clear and redraw
self.ax.clear()
# Convert timestamps to matplotlib format
timestamps = list(self.time_series_data['timestamps'])
values = list(self.time_series_data['values'])
if timestamps and values:
# Plot data
self.ax.plot(timestamps, values, 'b-', linewidth=1, alpha=0.7)
self.ax.scatter(timestamps[-10:], values[-10:], color='red', s=20, alpha=0.8)
# Format x-axis
self.ax.set_xlabel("Time")
self.ax.set_ylabel("Value")
self.ax.set_title(f"Real-time Data ({len(values)} points)")
self.ax.grid(True, alpha=0.3)
# Auto-scale
self.ax.relim()
self.ax.autoscale_view()
# Redraw
self.canvas.draw()
def _update_statistics(self):
"""Update statistics display"""
if not self.time_series_data['values']:
self.root.after(5000, self._update_statistics)
return
values = list(self.time_series_data['values'])
timestamps = list(self.time_series_data['timestamps'])
# Calculate statistics
stats_text = "Real-time Data Statistics\n"
stats_text += "=" * 30 + "\n\n"
stats_text += f"Data Points: {len(values)}\n"
stats_text += f"Time Range: {timestamps[0].strftime('%H:%M:%S')} - {timestamps[-1].strftime('%H:%M:%S')}\n\n"
if values:
stats_text += f"Current Value: {values[-1]:.2f}\n"
stats_text += f"Minimum: {min(values):.2f}\n"
stats_text += f"Maximum: {max(values):.2f}\n"
stats_text += f"Average: {sum(values)/len(values):.2f}\n"
# Calculate standard deviation
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / len(values)
std_dev = variance ** 0.5
stats_text += f"Std Deviation: {std_dev:.2f}\n"
# Recent trend (last 20 points)
if len(values) >= 20:
recent_values = values[-20:]
trend = recent_values[-1] - recent_values[0]
stats_text += f"Recent Trend: {'+' if trend > 0 else ''}{trend:.2f}\n"
# Update display
self.stats_text.delete(1.0, tk.END)
self.stats_text.insert(tk.END, stats_text)
# Schedule next update
self.root.after(5000, self._update_statistics)
def run(self):
"""Run the dashboard"""
print(f"🎛️ Starting Real-time Data Dashboard")
print(f" Port: {self.port}")
print(f" Baudrate: {self.baudrate}")
try:
self.root.mainloop()
except KeyboardInterrupt:
pass
finally:
if self.serial:
self.serial.close()
# Example usage
dashboard = RealTimeDataDashboard('/dev/ttyUSB0', 9600)
dashboard.run()
Data Analysis Tools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
from scipy import signal
from scipy.stats import pearsonr
import sqlite3
class SerialDataAnalyzer:
def __init__(self, db_path: str = 'serial_data.db'):
self.db_path = db_path
def load_session_data(self, session_id: int) -> pd.DataFrame:
"""Load session data into pandas DataFrame"""
conn = sqlite3.connect(self.db_path)
query = '''
SELECT p.timestamp, p.field_name, p.numeric_value, p.text_value, p.boolean_value
FROM parsed_data p
WHERE p.session_id = ?
ORDER BY p.timestamp
'''
df = pd.read_sql_query(query, conn, params=(session_id,))
conn.close()
# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Combine value columns
df['value'] = df['numeric_value'].fillna(
df['text_value'].fillna(df['boolean_value'])
)
return df
def pivot_sensor_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Pivot data to have sensors as columns"""
pivot_df = df.pivot_table(
index='timestamp',
columns='field_name',
values='value',
aggfunc='first'
)
# Forward fill missing values
pivot_df = pivot_df.fillna(method='ffill')
return pivot_df
def basic_statistics(self, df: pd.DataFrame) -> dict:
"""Calculate basic statistics for all numeric columns"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
stats = {}
for col in numeric_cols:
if col in df.columns:
data = df[col].dropna()
if len(data) > 0:
stats[col] = {
'count': len(data),
'mean': data.mean(),
'std': data.std(),
'min': data.min(),
'max': data.max(),
'median': data.median(),
'q25': data.quantile(0.25),
'q75': data.quantile(0.75)
}
return stats
def detect_anomalies(self, df: pd.DataFrame, column: str, method: str = 'zscore', threshold: float = 3.0) -> pd.DataFrame:
"""Detect anomalies in data"""
if column not in df.columns:
return pd.DataFrame()
data = df[column].dropna()
if method == 'zscore':
z_scores = np.abs((data - data.mean()) / data.std())
anomalies = df[z_scores > threshold]
elif method == 'iqr':
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
anomalies = df[(data < lower_bound) | (data > upper_bound)]
elif method == 'isolation_forest':
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
outlier_labels = iso_forest.fit_predict(data.values.reshape(-1, 1))
anomalies = df[outlier_labels == -1]
return anomalies
def calculate_moving_averages(self, df: pd.DataFrame, windows: list = [5, 10, 20]) -> pd.DataFrame:
"""Calculate moving averages for numeric columns"""
result_df = df.copy()
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
for window in windows:
result_df[f'{col}_ma{window}'] = df[col].rolling(window=window, min_periods=1).mean()
return result_df
def correlation_analysis(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate correlation matrix for numeric columns"""
numeric_df = df.select_dtypes(include=[np.number])
correlation_matrix = numeric_df.corr()
return correlation_matrix
def trend_analysis(self, df: pd.DataFrame, column: str) -> dict:
"""Analyze trend in time series data"""
if column not in df.columns:
return {}
# Ensure data is sorted by timestamp
df_sorted = df.sort_values('timestamp')
data = df_sorted[column].dropna()
if len(data) < 2:
return {'error': 'Insufficient data points'}
# Calculate trend using linear regression
x = np.arange(len(data))
coeffs = np.polyfit(x, data, 1)
trend_slope = coeffs[0]
# Calculate trend strength (R-squared)
y_pred = np.polyval(coeffs, x)
ss_res = np.sum((data - y_pred) ** 2)
ss_tot = np.sum((data - np.mean(data)) ** 2)
r_squared = 1 - (ss_res / ss_tot)
# Determine trend direction
if abs(trend_slope) < 0.001:
trend_direction = 'stable'
elif trend_slope > 0:
trend_direction = 'increasing'
else:
trend_direction = 'decreasing'
return {
'slope': trend_slope,
'r_squared': r_squared,
'direction': trend_direction,
'start_value': data.iloc[0],
'end_value': data.iloc[-1],
'total_change': data.iloc[-1] - data.iloc[0],
'percent_change': ((data.iloc[-1] - data.iloc[0]) / data.iloc[0]) * 100 if data.iloc[0] != 0 else 0
}
def frequency_analysis(self, df: pd.DataFrame, column: str, sampling_rate: float = 1.0) -> dict:
"""Perform frequency domain analysis"""
if column not in df.columns:
return {}
data = df[column].dropna()
if len(data) < 4:
return {'error': 'Insufficient data for frequency analysis'}
# Remove DC component
data_centered = data - data.mean()
# Apply FFT
fft_values = np.fft.fft(data_centered)
frequencies = np.fft.fftfreq(len(data_centered), 1/sampling_rate)
# Get magnitude spectrum (positive frequencies only)
n = len(data_centered)
magnitude_spectrum = np.abs(fft_values[:n//2])
frequency_bins = frequencies[:n//2]
# Find dominant frequency
if len(magnitude_spectrum) > 1:
dominant_freq_idx = np.argmax(magnitude_spectrum[1:]) + 1 # Skip DC component
dominant_frequency = frequency_bins[dominant_freq_idx]
dominant_magnitude = magnitude_spectrum[dominant_freq_idx]
else:
dominant_frequency = 0
dominant_magnitude = 0
return {
'dominant_frequency': dominant_frequency,
'dominant_magnitude': dominant_magnitude,
'frequency_bins': frequency_bins,
'magnitude_spectrum': magnitude_spectrum
}
def generate_report(self, session_id: int, output_file: str = None) -> str:
"""Generate comprehensive analysis report"""
# Load data
df = self.load_session_data(session_id)
if df.empty:
return "No data found for session"
# Pivot data
pivot_df = self.pivot_sensor_data(df)
report = []
report.append("Serial Data Analysis Report")
report.append("=" * 50)
report.append(f"Session ID: {session_id}")
report.append(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Data Points: {len(df)}")
report.append(f"Time Range: {df['timestamp'].min()} to {df['timestamp'].max()}")
report.append(f"Duration: {df['timestamp'].max() - df['timestamp'].min()}")
report.append("")
# Basic statistics
stats = self.basic_statistics(pivot_df)
if stats:
report.append("Basic Statistics:")
report.append("-" * 20)
for field, field_stats in stats.items():
report.append(f"\n{field}:")
for stat_name, value in field_stats.items():
report.append(f" {stat_name}: {value:.3f}")
# Correlation analysis
if len(pivot_df.columns) > 1:
corr_matrix = self.correlation_analysis(pivot_df)
report.append(f"\n\nCorrelation Matrix:")
report.append("-" * 20)
report.append(str(corr_matrix.round(3)))
# Trend analysis for each numeric field
report.append(f"\n\nTrend Analysis:")
report.append("-" * 20)
for col in pivot_df.select_dtypes(include=[np.number]).columns:
trend_info = self.trend_analysis(pivot_df.reset_index(), col)
if 'error' not in trend_info:
report.append(f"\n{col}:")
report.append(f" Direction: {trend_info['direction']}")
report.append(f" Slope: {trend_info['slope']:.6f}")
report.append(f" R-squared: {trend_info['r_squared']:.3f}")
report.append(f" Total Change: {trend_info['total_change']:.3f}")
report.append(f" Percent Change: {trend_info['percent_change']:.2f}%")
# Anomaly detection
report.append(f"\n\nAnomaly Detection:")
report.append("-" * 20)
for col in pivot_df.select_dtypes(include=[np.number]).columns:
anomalies = self.detect_anomalies(pivot_df.reset_index(), col)
report.append(f"\n{col}: {len(anomalies)} anomalies detected")
report_text = "\n".join(report)
# Save to file if requested
if output_file:
with open(output_file, 'w') as f:
f.write(report_text)
print(f"📄 Report saved to {output_file}")
return report_text
def create_visualizations(self, session_id: int, save_plots: bool = True):
"""Create comprehensive visualizations"""
# Load and pivot data
df = self.load_session_data(session_id)
pivot_df = self.pivot_sensor_data(df)
if pivot_df.empty:
print("No data to visualize")
return
numeric_cols = pivot_df.select_dtypes(include=[np.number]).columns
n_cols = len(numeric_cols)
if n_cols == 0:
print("No numeric data to visualize")
return
# Set up subplots
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(f'Serial Data Analysis - Session {session_id}', fontsize=16)
# Time series plot
ax1 = axes[0, 0]
for col in numeric_cols[:5]: # Limit to first 5 columns
ax1.plot(pivot_df.index, pivot_df[col], label=col, alpha=0.8)
ax1.set_title('Time Series Data')
ax1.set_xlabel('Time')
ax1.set_ylabel('Values')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Distribution plots
ax2 = axes[0, 1]
if n_cols >= 1:
first_col = numeric_cols[0]
ax2.hist(pivot_df[first_col].dropna(), bins=50, alpha=0.7, edgecolor='black')
ax2.set_title(f'Distribution - {first_col}')
ax2.set_xlabel('Value')
ax2.set_ylabel('Frequency')
ax2.grid(True, alpha=0.3)
# Correlation heatmap
ax3 = axes[1, 0]
if n_cols > 1:
corr_matrix = self.correlation_analysis(pivot_df)
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, ax=ax3)
ax3.set_title('Correlation Heatmap')
else:
ax3.text(0.5, 0.5, 'Insufficient data\nfor correlation analysis',
transform=ax3.transAxes, ha='center', va='center')
ax3.set_title('Correlation Heatmap')
# Box plot for outlier detection
ax4 = axes[1, 1]
if n_cols >= 1:
box_data = [pivot_df[col].dropna() for col in numeric_cols[:5]]
ax4.boxplot(box_data, labels=[col[:10] for col in numeric_cols[:5]])
ax4.set_title('Box Plot (Outlier Detection)')
ax4.set_ylabel('Values')
ax4.tick_params(axis='x', rotation=45)
plt.tight_layout()
if save_plots:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'serial_analysis_session_{session_id}_{timestamp}.png'
plt.savefig(filename, dpi=300, bbox_inches='tight')
print(f"📊 Visualizations saved to {filename}")
plt.show()
# Example usage
analyzer = SerialDataAnalyzer('sensor_data.db')
# Generate report for session 1
report = analyzer.generate_report(1, 'analysis_report.txt')
print(report[:500] + "..." if len(report) > 500 else report)
# Create visualizations
analyzer.create_visualizations(1, save_plots=True)
File Logging
Simple file-based loggers for CSV, JSON, and text formats
Database Storage
SQLite integration for efficient querying and analysis
Real-time Dashboard
GUI dashboard with live graphs and statistics
Data Analysis
Statistical analysis tools for trend and anomaly detection
Comprehensive data logging enables powerful analysis of serial data streams. From simple file logging to real-time dashboards, these tools provide everything needed for scientific and industrial data collection.
How is this guide?