Arduino and PySerial Integration
Connect Python to Arduino using PySerial. Complete guide with wiring, code examples, and real projects for sensors, LEDs, and data logging.
Connect Python to Arduino for powerful hardware control and data collection.
Perfect Partnership: Arduino handles real-time hardware control while Python provides data processing, web interfaces, and complex logic.
Quick Setup
Arduino Code
Upload this basic communication sketch:
void setup() {
Serial.begin(9600);
pinMode(13, OUTPUT); // Built-in LED
}
void loop() {
if (Serial.available()) {
String command = Serial.readString();
command.trim();
if (command == "LED_ON") {
digitalWrite(13, HIGH);
Serial.println("LED ON");
} else if (command == "LED_OFF") {
digitalWrite(13, LOW);
Serial.println("LED OFF");
}
}
}
Python Code
Control Arduino from Python:
import serial
import time
arduino = serial.Serial('/dev/ttyACM0', 9600, timeout=2)
time.sleep(2) # Wait for Arduino reset
def send_command(cmd):
arduino.write((cmd + '\n').encode())
response = arduino.readline().decode().strip()
return response
# Control LED
print(send_command("LED_ON")) # "LED ON"
time.sleep(1)
print(send_command("LED_OFF")) # "LED OFF"
arduino.close()
Find Arduino Port
Auto-detect Arduino:
import serial.tools.list_ports
def find_arduino():
"""Find Arduino automatically"""
for port in serial.tools.list_ports.comports():
if 'Arduino' in port.description or 'CH340' in port.description:
return port.device
return None
port = find_arduino()
if port:
print(f"Found Arduino on {port}")
else:
print("Arduino not found")
Data Collection Project
Build a temperature monitoring system:
Arduino Code (DHT22 sensor):
#include <DHT.h>
#define DHTPIN 2
#define DHTTYPE DHT22
DHT dht(DHTPIN, DHTTYPE);
void setup() {
Serial.begin(9600);
dht.begin();
}
void loop() {
float temp = dht.readTemperature();
float humidity = dht.readHumidity();
if (!isnan(temp) && !isnan(humidity)) {
// Send as JSON
Serial.print("{\"temp\":");
Serial.print(temp);
Serial.print(",\"humidity\":");
Serial.print(humidity);
Serial.println("}");
} else {
Serial.println("ERROR:Sensor read failed");
}
delay(2000);
}
Python Data Logger:
import serial
import json
import datetime
from pathlib import Path
class ArduinoLogger:
def __init__(self, port, log_file="sensor_data.csv"):
self.arduino = serial.Serial(port, 9600, timeout=2)
self.log_file = Path(log_file)
self.init_csv()
time.sleep(2) # Arduino reset
def init_csv(self):
"""Create CSV with headers"""
if not self.log_file.exists():
with open(self.log_file, 'w') as f:
f.write("timestamp,temperature,humidity\n")
def parse_data(self, line):
"""Parse Arduino JSON data"""
if line.startswith("{"):
try:
return json.loads(line)
except json.JSONDecodeError:
pass
return None
def log_data(self, data):
"""Save to CSV"""
timestamp = datetime.datetime.now()
with open(self.log_file, 'a') as f:
f.write(f"{timestamp.isoformat()},{data['temp']},{data['humidity']}\n")
print(f"[{timestamp:%H:%M:%S}] {data['temp']:.1f}°C, {data['humidity']:.1f}%")
def run(self):
"""Main logging loop"""
print("Temperature logger started...")
try:
while True:
line = self.arduino.readline().decode().strip()
if line:
if line.startswith("ERROR"):
print(f"Arduino error: {line}")
else:
data = self.parse_data(line)
if data:
self.log_data(data)
except KeyboardInterrupt:
print("\nLogging stopped")
finally:
self.arduino.close()
# Run logger
logger = ArduinoLogger('/dev/ttyACM0')
logger.run()
SQLite Integration:
import sqlite3
import serial
import json
import time
class DatabaseLogger:
def __init__(self, port, db_file="sensors.db"):
self.arduino = serial.Serial(port, 9600, timeout=1)
self.db_file = db_file
self.init_database()
time.sleep(2)
def init_database(self):
"""Create database table"""
conn = sqlite3.connect(self.db_file)
conn.execute('''
CREATE TABLE IF NOT EXISTS readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
temperature REAL,
humidity REAL
)
''')
conn.commit()
conn.close()
def save_reading(self, data):
"""Save to database"""
conn = sqlite3.connect(self.db_file)
conn.execute(
'INSERT INTO readings (temperature, humidity) VALUES (?, ?)',
(data['temp'], data['humidity'])
)
conn.commit()
conn.close()
def get_latest(self, count=10):
"""Get recent readings"""
conn = sqlite3.connect(self.db_file)
cursor = conn.execute(
'SELECT * FROM readings ORDER BY timestamp DESC LIMIT ?',
(count,)
)
data = cursor.fetchall()
conn.close()
return data
# Usage
logger = DatabaseLogger('/dev/ttyACM0')
# ... logging code ...
recent = logger.get_latest(5)
for reading in recent:
print(f"{reading[1]}: {reading[2]:.1f}°C")
LED Control System
Create a multi-LED controller:
Arduino Code:
// Control 8 LEDs
const int LED_PINS[] = {2, 3, 4, 5, 6, 7, 8, 9};
const int NUM_LEDS = 8;
void setup() {
Serial.begin(9600);
for (int i = 0; i < NUM_LEDS; i++) {
pinMode(LED_PINS[i], OUTPUT);
digitalWrite(LED_PINS[i], LOW);
}
Serial.println("READY");
}
void loop() {
if (Serial.available()) {
String command = Serial.readString();
command.trim();
if (command.startsWith("LED")) {
// Format: LED<num>:<ON|OFF>
int colonPos = command.indexOf(':');
if (colonPos > 0) {
int ledNum = command.substring(3, colonPos).toInt();
String state = command.substring(colonPos + 1);
if (ledNum >= 0 && ledNum < NUM_LEDS) {
digitalWrite(LED_PINS[ledNum], state == "ON" ? HIGH : LOW);
Serial.println("OK");
} else {
Serial.println("ERROR:Invalid LED");
}
}
} else if (command == "ALL_OFF") {
for (int i = 0; i < NUM_LEDS; i++) {
digitalWrite(LED_PINS[i], LOW);
}
Serial.println("OK");
}
}
}
Python Controller:
import serial
import time
class LEDController:
def __init__(self, port):
self.arduino = serial.Serial(port, 9600, timeout=1)
time.sleep(2)
# Wait for Arduino ready
while True:
line = self.arduino.readline().decode().strip()
if line == "READY":
break
def set_led(self, led_num, state):
"""Control individual LED"""
command = f"LED{led_num}:{state}"
self.arduino.write(command.encode() + b'\n')
response = self.arduino.readline().decode().strip()
return response == "OK"
def all_off(self):
"""Turn all LEDs off"""
self.arduino.write(b"ALL_OFF\n")
return self.arduino.readline().decode().strip() == "OK"
def pattern_wave(self, delay=0.1):
"""Create wave pattern"""
for i in range(8):
self.set_led(i, "ON")
time.sleep(delay)
self.set_led(i, "OFF")
def pattern_binary(self, number, delay=1):
"""Display number in binary"""
self.all_off()
for i in range(8):
if number & (1 << i):
self.set_led(i, "ON")
time.sleep(delay)
def close(self):
self.all_off()
self.arduino.close()
# Demo patterns
controller = LEDController('/dev/ttyACM0')
# Wave pattern
for _ in range(3):
controller.pattern_wave()
# Count in binary
for i in range(16):
controller.pattern_binary(i, 0.5)
controller.close()
High-Speed Data Acquisition
Stream sensor data at high rates:
Fast Sampling
Arduino samples at high frequency
Buffered Transfer
Send data in batches
Python Processing
Real-time analysis and storage
Arduino Multi-Sensor:
void setup() {
Serial.begin(115200); // Higher baud rate
}
void loop() {
// Read multiple sensors quickly
int light = analogRead(A0);
int temp_raw = analogRead(A1);
int pressure_raw = analogRead(A2);
// Send compact CSV format
Serial.print(millis());
Serial.print(",");
Serial.print(temp_raw);
Serial.print(",");
Serial.print(light);
Serial.print(",");
Serial.println(pressure_raw);
delay(10); // 100 Hz sampling
}
Python High-Speed Reader:
import serial
import time
import sqlite3
from collections import deque
class HighSpeedDAQ:
def __init__(self, port, db_file="daq_data.db"):
self.arduino = serial.Serial(port, 115200, timeout=0.1)
self.db_file = db_file
self.buffer = deque(maxlen=10000)
self.init_database()
time.sleep(2)
def init_database(self):
conn = sqlite3.connect(self.db_file)
conn.execute('''
CREATE TABLE IF NOT EXISTS readings (
timestamp INTEGER,
temperature INTEGER,
light INTEGER,
pressure INTEGER
)
''')
conn.commit()
conn.close()
def parse_line(self, line):
"""Parse CSV data from Arduino"""
try:
parts = line.split(',')
return {
'timestamp': int(parts[0]),
'temperature': int(parts[1]),
'light': int(parts[2]),
'pressure': int(parts[3])
}
except (ValueError, IndexError):
return None
def save_batch(self, batch):
"""Save batch to database"""
conn = sqlite3.connect(self.db_file)
conn.executemany(
'INSERT INTO readings VALUES (?, ?, ?, ?)',
[(d['timestamp'], d['temperature'], d['light'], d['pressure'])
for d in batch]
)
conn.commit()
conn.close()
def collect_data(self, duration=60):
"""High-speed data collection"""
start_time = time.time()
batch = []
print(f"Collecting data for {duration} seconds...")
while time.time() - start_time < duration:
if self.arduino.in_waiting:
line = self.arduino.readline().decode().strip()
if line:
data = self.parse_line(line)
if data:
batch.append(data)
self.buffer.append(data)
# Save every 100 readings
if len(batch) >= 100:
self.save_batch(batch)
print(f"Saved {len(batch)} readings")
batch = []
# Save remaining data
if batch:
self.save_batch(batch)
print(f"Collection complete. {len(self.buffer)} total readings")
def get_stats(self):
"""Calculate statistics"""
if not self.buffer:
return None
temps = [d['temperature'] for d in self.buffer]
return {
'count': len(self.buffer),
'temp_avg': sum(temps) / len(temps),
'temp_min': min(temps),
'temp_max': max(temps),
'sample_rate': len(self.buffer) / 60 # samples per second
}
def close(self):
self.arduino.close()
# Run high-speed acquisition
daq = HighSpeedDAQ('/dev/ttyACM0')
daq.collect_data(120) # 2 minutes
stats = daq.get_stats()
if stats:
print(f"Sample rate: {stats['sample_rate']:.1f} Hz")
print(f"Temperature range: {stats['temp_min']}-{stats['temp_max']}")
daq.close()
Connection Troubleshooting
Arduino Reset Issue: Arduino resets when PySerial connects. Always wait 2 seconds after opening the port.
Reliable Connection:
import serial
import time
def connect_arduino_reliable(port, baudrate=9600, timeout=5):
"""Establish reliable Arduino connection"""
try:
arduino = serial.Serial(port, baudrate, timeout=1)
time.sleep(2) # Wait for reset
# Clear any boot messages
arduino.reset_input_buffer()
# Test communication with ping/pong
arduino.write(b"PING\n")
start = time.time()
while time.time() - start < timeout:
line = arduino.readline().decode().strip()
if line == "PONG":
print("Arduino connected and ready")
return arduino
time.sleep(0.1)
arduino.close()
raise Exception("Arduino not responding to PING")
except serial.SerialException as e:
raise Exception(f"Connection failed: {e}")
# Arduino should respond to PING with PONG
arduino = connect_arduino_reliable('/dev/ttyACM0')
Add to Arduino setup():
// In your Arduino code
void setup() {
Serial.begin(9600);
// ... other setup ...
}
void loop() {
if (Serial.available()) {
String cmd = Serial.readString();
cmd.trim();
if (cmd == "PING") {
Serial.println("PONG");
}
// ... handle other commands ...
}
}
Best Practices
Error Handling
Always handle serial exceptions and timeouts
Reset Timing
Wait 2 seconds after opening Arduino connection
Data Format
Use JSON or CSV for structured data exchange
Flow Control
Implement handshaking for reliable communication
Production-Ready Pattern:
import serial
import json
import logging
import time
from typing import Optional
class ArduinoInterface:
"""Production-ready Arduino interface"""
def __init__(self, port: str, baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.arduino: Optional[serial.Serial] = None
self.logger = logging.getLogger(__name__)
def connect(self) -> bool:
"""Connect with retry logic"""
for attempt in range(3):
try:
self.arduino = serial.Serial(
self.port,
self.baudrate,
timeout=2
)
time.sleep(2) # Arduino reset
self.arduino.reset_input_buffer()
# Test connection
if self._test_connection():
self.logger.info(f"Connected to Arduino on {self.port}")
return True
except Exception as e:
self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
time.sleep(1)
return False
def _test_connection(self) -> bool:
"""Test if Arduino is responding"""
try:
self.send_command("PING")
response = self.read_response(timeout=5)
return response == "PONG"
except:
return False
def send_command(self, command: str) -> bool:
"""Send command to Arduino"""
if not self.arduino:
return False
try:
self.arduino.write((command + '\n').encode())
return True
except Exception as e:
self.logger.error(f"Send error: {e}")
return False
def read_response(self, timeout: float = 2) -> Optional[str]:
"""Read response with timeout"""
if not self.arduino:
return None
old_timeout = self.arduino.timeout
self.arduino.timeout = timeout
try:
response = self.arduino.readline().decode().strip()
return response if response else None
except Exception as e:
self.logger.error(f"Read error: {e}")
return None
finally:
self.arduino.timeout = old_timeout
def close(self):
"""Clean shutdown"""
if self.arduino:
self.arduino.close()
self.arduino = None
# Usage
arduino = ArduinoInterface('/dev/ttyACM0')
if arduino.connect():
arduino.send_command("LED_ON")
response = arduino.read_response()
print(f"Arduino says: {response}")
arduino.close()
Next Steps
- Raspberry Pi Integration - Use Pi as Arduino bridge
- Real-time Plotting - Visualize sensor data
- Web Interface - Control Arduino from browser
- Advanced Examples - More complex projects
Arduino + Python = Powerful Combo: You now have the tools to build sophisticated hardware projects with Python's data processing and Arduino's real-time control.
Arduino and PySerial integration opens endless possibilities for IoT, automation, and data collection projects.
How is this guide?