PySerial
PySerialDocs

Data Logging

Log serial data to CSV and SQLite with PySerial. Capture sensor readings, timestamp entries, and query stored data in Python.

Log serial port data to files and databases for analysis, compliance, or long-term monitoring.

Log to CSV

A single function that reads lines from a serial port, timestamps them, and writes CSV:

import serial
import csv
import time
from datetime import datetime

def log_serial_to_csv(port, baudrate=9600, csv_path="serial_log.csv", duration=60):
    ser = serial.Serial(port, baudrate, timeout=1)

    with open(csv_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["timestamp", "raw_data"])

        start = time.time()
        count = 0

        while time.time() - start < duration:
            line = ser.readline().decode("utf-8", errors="ignore").strip()
            if not line:
                continue

            writer.writerow([datetime.now().isoformat(), line])
            count += 1

            if count % 100 == 0:
                f.flush()

    ser.close()
    print(f"Logged {count} lines to {csv_path}")

log_serial_to_csv("/dev/ttyUSB0", 115200, duration=300)

Log serial test data automatically

TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.

Parse Structured Data

If your device sends CSV or JSON, parse before logging to get typed columns:

import serial
import csv
import json
import time
from datetime import datetime

def log_structured_data(port, baudrate=9600, csv_path="sensor_data.csv", duration=60):
    """Log structured serial data (JSON or CSV) to a typed CSV file."""
    ser = serial.Serial(port, baudrate, timeout=1)
    headers_written = False
    count = 0

    with open(csv_path, "w", newline="") as f:
        writer = None
        start = time.time()

        while time.time() - start < duration:
            line = ser.readline().decode("utf-8", errors="ignore").strip()
            if not line:
                continue

            # Try JSON first, fall back to CSV split
            try:
                data = json.loads(line)
            except json.JSONDecodeError:
                parts = line.split(",")
                data = {f"field_{i}": v.strip() for i, v in enumerate(parts)}

            if not headers_written:
                columns = ["timestamp"] + list(data.keys())
                writer = csv.DictWriter(f, fieldnames=columns)
                writer.writeheader()
                headers_written = True

            row = {"timestamp": datetime.now().isoformat(), **data}
            writer.writerow(row)
            count += 1

    ser.close()
    print(f"Logged {count} structured rows to {csv_path}")

log_structured_data("/dev/ttyUSB0", 9600, duration=120)

Log to SQLite

For queryable storage, write serial data to SQLite:

import serial
import sqlite3
import json
import time
from datetime import datetime

def log_serial_to_sqlite(port, baudrate=9600, db_path="serial_data.db", duration=60):
    ser = serial.Serial(port, baudrate, timeout=1)

    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS readings (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp TEXT NOT NULL,
            raw_data TEXT NOT NULL,
            parsed_json TEXT
        )
    """)
    conn.commit()

    start = time.time()
    count = 0
    batch = []

    while time.time() - start < duration:
        line = ser.readline().decode("utf-8", errors="ignore").strip()
        if not line:
            continue

        parsed = None
        try:
            parsed = json.dumps(json.loads(line))
        except json.JSONDecodeError:
            pass

        batch.append((datetime.now().isoformat(), line, parsed))
        count += 1

        # Flush every 50 rows
        if len(batch) >= 50:
            conn.executemany(
                "INSERT INTO readings (timestamp, raw_data, parsed_json) VALUES (?, ?, ?)",
                batch,
            )
            conn.commit()
            batch = []

    # Flush remaining
    if batch:
        conn.executemany(
            "INSERT INTO readings (timestamp, raw_data, parsed_json) VALUES (?, ?, ?)",
            batch,
        )
        conn.commit()

    ser.close()
    conn.close()
    print(f"Logged {count} rows to {db_path}")

log_serial_to_sqlite("/dev/ttyUSB0", 115200, duration=300)

Query Logged Data

Once data is in SQLite, query it directly:

import sqlite3

conn = sqlite3.connect("serial_data.db")

# Last 10 entries
rows = conn.execute(
    "SELECT timestamp, raw_data FROM readings ORDER BY id DESC LIMIT 10"
).fetchall()
for ts, data in rows:
    print(f"{ts}: {data}")

# Count by hour
hourly = conn.execute("""
    SELECT substr(timestamp, 1, 13) AS hour, COUNT(*) AS cnt
    FROM readings
    GROUP BY hour
    ORDER BY hour
""").fetchall()
for hour, cnt in hourly:
    print(f"{hour}: {cnt} readings")

conn.close()

Choosing a Storage Format

FormatBest ForTradeoffs
CSVQuick capture, spreadsheet importNo queries, append-only, no schema
SQLiteQueryable history, long-running logsSlightly more setup, file locking
JSON linesStructured data, log shippingLarge files, slower to query locally

For high-throughput logging (1000+ lines/sec), batch your writes. The SQLite example above flushes every 50 rows. For CSV, call f.flush() periodically instead of on every line.