PySerial
PySerialDocs

Arduino and PySerial Integration

Connect Python to Arduino using PySerial. Complete guide with wiring, code examples, and real projects for sensors, LEDs, and data logging.

Connect Python to Arduino for powerful hardware control and data collection.

Perfect Partnership: Arduino handles real-time hardware control while Python provides data processing, web interfaces, and complex logic.

Quick Setup

Arduino Code

Upload this basic communication sketch:

void setup() {
    Serial.begin(9600);
    pinMode(13, OUTPUT); // Built-in LED
}

void loop() {
    if (Serial.available()) {
        String command = Serial.readString();
        command.trim();
        
        if (command == "LED_ON") {
            digitalWrite(13, HIGH);
            Serial.println("LED ON");
        } else if (command == "LED_OFF") {
            digitalWrite(13, LOW);
            Serial.println("LED OFF");
        }
    }
}

Python Code

Control Arduino from Python:

import serial
import time

arduino = serial.Serial('/dev/ttyACM0', 9600, timeout=2)
time.sleep(2)  # Wait for Arduino reset

def send_command(cmd):
    arduino.write((cmd + '\n').encode())
    response = arduino.readline().decode().strip()
    return response

# Control LED
print(send_command("LED_ON"))   # "LED ON"
time.sleep(1)
print(send_command("LED_OFF"))  # "LED OFF"

arduino.close()

Find Arduino Port

Auto-detect Arduino:

import serial.tools.list_ports

def find_arduino():
    """Find Arduino automatically"""
    for port in serial.tools.list_ports.comports():
        if 'Arduino' in port.description or 'CH340' in port.description:
            return port.device
    return None

port = find_arduino()
if port:
    print(f"Found Arduino on {port}")
else:
    print("Arduino not found")

Data Collection Project

Build a temperature monitoring system:

Arduino Code (DHT22 sensor):

#include <DHT.h>

#define DHTPIN 2
#define DHTTYPE DHT22

DHT dht(DHTPIN, DHTTYPE);

void setup() {
    Serial.begin(9600);
    dht.begin();
}

void loop() {
    float temp = dht.readTemperature();
    float humidity = dht.readHumidity();
    
    if (!isnan(temp) && !isnan(humidity)) {
        // Send as JSON
        Serial.print("{\"temp\":");
        Serial.print(temp);
        Serial.print(",\"humidity\":");
        Serial.print(humidity);
        Serial.println("}");
    } else {
        Serial.println("ERROR:Sensor read failed");
    }
    
    delay(2000);
}

Python Data Logger:

import serial
import json
import datetime
from pathlib import Path

class ArduinoLogger:
    def __init__(self, port, log_file="sensor_data.csv"):
        self.arduino = serial.Serial(port, 9600, timeout=2)
        self.log_file = Path(log_file)
        self.init_csv()
        time.sleep(2)  # Arduino reset
    
    def init_csv(self):
        """Create CSV with headers"""
        if not self.log_file.exists():
            with open(self.log_file, 'w') as f:
                f.write("timestamp,temperature,humidity\n")
    
    def parse_data(self, line):
        """Parse Arduino JSON data"""
        if line.startswith("{"):
            try:
                return json.loads(line)
            except json.JSONDecodeError:
                pass
        return None
    
    def log_data(self, data):
        """Save to CSV"""
        timestamp = datetime.datetime.now()
        with open(self.log_file, 'a') as f:
            f.write(f"{timestamp.isoformat()},{data['temp']},{data['humidity']}\n")
        
        print(f"[{timestamp:%H:%M:%S}] {data['temp']:.1f}°C, {data['humidity']:.1f}%")
    
    def run(self):
        """Main logging loop"""
        print("Temperature logger started...")
        try:
            while True:
                line = self.arduino.readline().decode().strip()
                if line:
                    if line.startswith("ERROR"):
                        print(f"Arduino error: {line}")
                    else:
                        data = self.parse_data(line)
                        if data:
                            self.log_data(data)
        except KeyboardInterrupt:
            print("\nLogging stopped")
        finally:
            self.arduino.close()

# Run logger
logger = ArduinoLogger('/dev/ttyACM0')
logger.run()

SQLite Integration:

import sqlite3
import serial
import json
import time

class DatabaseLogger:
    def __init__(self, port, db_file="sensors.db"):
        self.arduino = serial.Serial(port, 9600, timeout=1)
        self.db_file = db_file
        self.init_database()
        time.sleep(2)
    
    def init_database(self):
        """Create database table"""
        conn = sqlite3.connect(self.db_file)
        conn.execute('''
            CREATE TABLE IF NOT EXISTS readings (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                temperature REAL,
                humidity REAL
            )
        ''')
        conn.commit()
        conn.close()
    
    def save_reading(self, data):
        """Save to database"""
        conn = sqlite3.connect(self.db_file)
        conn.execute(
            'INSERT INTO readings (temperature, humidity) VALUES (?, ?)',
            (data['temp'], data['humidity'])
        )
        conn.commit()
        conn.close()
    
    def get_latest(self, count=10):
        """Get recent readings"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.execute(
            'SELECT * FROM readings ORDER BY timestamp DESC LIMIT ?',
            (count,)
        )
        data = cursor.fetchall()
        conn.close()
        return data

# Usage
logger = DatabaseLogger('/dev/ttyACM0')
# ... logging code ...
recent = logger.get_latest(5)
for reading in recent:
    print(f"{reading[1]}: {reading[2]:.1f}°C")

LED Control System

Create a multi-LED controller:

Arduino Code:

// Control 8 LEDs
const int LED_PINS[] = {2, 3, 4, 5, 6, 7, 8, 9};
const int NUM_LEDS = 8;

void setup() {
    Serial.begin(9600);
    for (int i = 0; i < NUM_LEDS; i++) {
        pinMode(LED_PINS[i], OUTPUT);
        digitalWrite(LED_PINS[i], LOW);
    }
    Serial.println("READY");
}

void loop() {
    if (Serial.available()) {
        String command = Serial.readString();
        command.trim();
        
        if (command.startsWith("LED")) {
            // Format: LED<num>:<ON|OFF>
            int colonPos = command.indexOf(':');
            if (colonPos > 0) {
                int ledNum = command.substring(3, colonPos).toInt();
                String state = command.substring(colonPos + 1);
                
                if (ledNum >= 0 && ledNum < NUM_LEDS) {
                    digitalWrite(LED_PINS[ledNum], state == "ON" ? HIGH : LOW);
                    Serial.println("OK");
                } else {
                    Serial.println("ERROR:Invalid LED");
                }
            }
        } else if (command == "ALL_OFF") {
            for (int i = 0; i < NUM_LEDS; i++) {
                digitalWrite(LED_PINS[i], LOW);
            }
            Serial.println("OK");
        }
    }
}

Python Controller:

import serial
import time

class LEDController:
    def __init__(self, port):
        self.arduino = serial.Serial(port, 9600, timeout=1)
        time.sleep(2)
        
        # Wait for Arduino ready
        while True:
            line = self.arduino.readline().decode().strip()
            if line == "READY":
                break
    
    def set_led(self, led_num, state):
        """Control individual LED"""
        command = f"LED{led_num}:{state}"
        self.arduino.write(command.encode() + b'\n')
        response = self.arduino.readline().decode().strip()
        return response == "OK"
    
    def all_off(self):
        """Turn all LEDs off"""
        self.arduino.write(b"ALL_OFF\n")
        return self.arduino.readline().decode().strip() == "OK"
    
    def pattern_wave(self, delay=0.1):
        """Create wave pattern"""
        for i in range(8):
            self.set_led(i, "ON")
            time.sleep(delay)
            self.set_led(i, "OFF")
    
    def pattern_binary(self, number, delay=1):
        """Display number in binary"""
        self.all_off()
        for i in range(8):
            if number & (1 << i):
                self.set_led(i, "ON")
        time.sleep(delay)
    
    def close(self):
        self.all_off()
        self.arduino.close()

# Demo patterns
controller = LEDController('/dev/ttyACM0')

# Wave pattern
for _ in range(3):
    controller.pattern_wave()

# Count in binary
for i in range(16):
    controller.pattern_binary(i, 0.5)

controller.close()

High-Speed Data Acquisition

Stream sensor data at high rates:

Fast Sampling

Arduino samples at high frequency

Buffered Transfer

Send data in batches

Python Processing

Real-time analysis and storage

Arduino Multi-Sensor:

void setup() {
    Serial.begin(115200);  // Higher baud rate
}

void loop() {
    // Read multiple sensors quickly
    int light = analogRead(A0);
    int temp_raw = analogRead(A1);
    int pressure_raw = analogRead(A2);
    
    // Send compact CSV format
    Serial.print(millis());
    Serial.print(",");
    Serial.print(temp_raw);
    Serial.print(",");
    Serial.print(light);
    Serial.print(",");
    Serial.println(pressure_raw);
    
    delay(10);  // 100 Hz sampling
}

Python High-Speed Reader:

import serial
import time
import sqlite3
from collections import deque

class HighSpeedDAQ:
    def __init__(self, port, db_file="daq_data.db"):
        self.arduino = serial.Serial(port, 115200, timeout=0.1)
        self.db_file = db_file
        self.buffer = deque(maxlen=10000)
        self.init_database()
        time.sleep(2)
    
    def init_database(self):
        conn = sqlite3.connect(self.db_file)
        conn.execute('''
            CREATE TABLE IF NOT EXISTS readings (
                timestamp INTEGER,
                temperature INTEGER,
                light INTEGER,
                pressure INTEGER
            )
        ''')
        conn.commit()
        conn.close()
    
    def parse_line(self, line):
        """Parse CSV data from Arduino"""
        try:
            parts = line.split(',')
            return {
                'timestamp': int(parts[0]),
                'temperature': int(parts[1]),
                'light': int(parts[2]),
                'pressure': int(parts[3])
            }
        except (ValueError, IndexError):
            return None
    
    def save_batch(self, batch):
        """Save batch to database"""
        conn = sqlite3.connect(self.db_file)
        conn.executemany(
            'INSERT INTO readings VALUES (?, ?, ?, ?)',
            [(d['timestamp'], d['temperature'], d['light'], d['pressure']) 
             for d in batch]
        )
        conn.commit()
        conn.close()
    
    def collect_data(self, duration=60):
        """High-speed data collection"""
        start_time = time.time()
        batch = []
        
        print(f"Collecting data for {duration} seconds...")
        
        while time.time() - start_time < duration:
            if self.arduino.in_waiting:
                line = self.arduino.readline().decode().strip()
                if line:
                    data = self.parse_line(line)
                    if data:
                        batch.append(data)
                        self.buffer.append(data)
                        
                        # Save every 100 readings
                        if len(batch) >= 100:
                            self.save_batch(batch)
                            print(f"Saved {len(batch)} readings")
                            batch = []
        
        # Save remaining data
        if batch:
            self.save_batch(batch)
        
        print(f"Collection complete. {len(self.buffer)} total readings")
    
    def get_stats(self):
        """Calculate statistics"""
        if not self.buffer:
            return None
        
        temps = [d['temperature'] for d in self.buffer]
        return {
            'count': len(self.buffer),
            'temp_avg': sum(temps) / len(temps),
            'temp_min': min(temps),
            'temp_max': max(temps),
            'sample_rate': len(self.buffer) / 60  # samples per second
        }
    
    def close(self):
        self.arduino.close()

# Run high-speed acquisition
daq = HighSpeedDAQ('/dev/ttyACM0')
daq.collect_data(120)  # 2 minutes

stats = daq.get_stats()
if stats:
    print(f"Sample rate: {stats['sample_rate']:.1f} Hz")
    print(f"Temperature range: {stats['temp_min']}-{stats['temp_max']}")

daq.close()

Connection Troubleshooting

Arduino Reset Issue: Arduino resets when PySerial connects. Always wait 2 seconds after opening the port.

Reliable Connection:

import serial
import time

def connect_arduino_reliable(port, baudrate=9600, timeout=5):
    """Establish reliable Arduino connection"""
    try:
        arduino = serial.Serial(port, baudrate, timeout=1)
        time.sleep(2)  # Wait for reset
        
        # Clear any boot messages
        arduino.reset_input_buffer()
        
        # Test communication with ping/pong
        arduino.write(b"PING\n")
        start = time.time()
        
        while time.time() - start < timeout:
            line = arduino.readline().decode().strip()
            if line == "PONG":
                print("Arduino connected and ready")
                return arduino
            time.sleep(0.1)
        
        arduino.close()
        raise Exception("Arduino not responding to PING")
        
    except serial.SerialException as e:
        raise Exception(f"Connection failed: {e}")

# Arduino should respond to PING with PONG
arduino = connect_arduino_reliable('/dev/ttyACM0')

Add to Arduino setup():

// In your Arduino code
void setup() {
    Serial.begin(9600);
    // ... other setup ...
}

void loop() {
    if (Serial.available()) {
        String cmd = Serial.readString();
        cmd.trim();
        
        if (cmd == "PING") {
            Serial.println("PONG");
        }
        // ... handle other commands ...
    }
}

Best Practices

Error Handling

Always handle serial exceptions and timeouts

Reset Timing

Wait 2 seconds after opening Arduino connection

Data Format

Use JSON or CSV for structured data exchange

Flow Control

Implement handshaking for reliable communication

Production-Ready Pattern:

import serial
import json
import logging
import time
from typing import Optional

class ArduinoInterface:
    """Production-ready Arduino interface"""
    
    def __init__(self, port: str, baudrate: int = 9600):
        self.port = port
        self.baudrate = baudrate
        self.arduino: Optional[serial.Serial] = None
        self.logger = logging.getLogger(__name__)
        
    def connect(self) -> bool:
        """Connect with retry logic"""
        for attempt in range(3):
            try:
                self.arduino = serial.Serial(
                    self.port, 
                    self.baudrate, 
                    timeout=2
                )
                time.sleep(2)  # Arduino reset
                self.arduino.reset_input_buffer()
                
                # Test connection
                if self._test_connection():
                    self.logger.info(f"Connected to Arduino on {self.port}")
                    return True
                    
            except Exception as e:
                self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
                time.sleep(1)
        
        return False
    
    def _test_connection(self) -> bool:
        """Test if Arduino is responding"""
        try:
            self.send_command("PING")
            response = self.read_response(timeout=5)
            return response == "PONG"
        except:
            return False
    
    def send_command(self, command: str) -> bool:
        """Send command to Arduino"""
        if not self.arduino:
            return False
        
        try:
            self.arduino.write((command + '\n').encode())
            return True
        except Exception as e:
            self.logger.error(f"Send error: {e}")
            return False
    
    def read_response(self, timeout: float = 2) -> Optional[str]:
        """Read response with timeout"""
        if not self.arduino:
            return None
        
        old_timeout = self.arduino.timeout
        self.arduino.timeout = timeout
        
        try:
            response = self.arduino.readline().decode().strip()
            return response if response else None
        except Exception as e:
            self.logger.error(f"Read error: {e}")
            return None
        finally:
            self.arduino.timeout = old_timeout
    
    def close(self):
        """Clean shutdown"""
        if self.arduino:
            self.arduino.close()
            self.arduino = None

# Usage
arduino = ArduinoInterface('/dev/ttyACM0')
if arduino.connect():
    arduino.send_command("LED_ON")
    response = arduino.read_response()
    print(f"Arduino says: {response}")
    arduino.close()

Next Steps

Arduino + Python = Powerful Combo: You now have the tools to build sophisticated hardware projects with Python's data processing and Arduino's real-time control.

Arduino and PySerial integration opens endless possibilities for IoT, automation, and data collection projects.

How is this guide?