Writing Serial Data with PySerial
Master PySerial write methods: write(), writelines(), flush(). Learn encoding, flow control, and reliable data transmission techniques.
Master all PySerial write methods and data transmission techniques.
Write Methods Overview
write()
Send bytes - most common method
writelines()
Send multiple byte strings
flush()
Force transmission of buffered data
send_break()
Send break condition signal
Method | Input Type | Use Case | Returns |
---|---|---|---|
write(data) | bytes | Send any data | int (bytes written) |
writelines(lines) | list of bytes | Send multiple lines | None |
flush() | None | Force buffer flush | None |
send_break() | None | Send break signal | None |
write() Method
The core method for sending data.
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# Send bytes
bytes_written = ser.write(b'Hello World')
print(f"Sent {bytes_written} bytes")
# Send with line ending
ser.write(b'Command\r\n')
# Send single byte
ser.write(b'\x1A') # Ctrl+Z
# Convert string to bytes (required!)
message = "Hello Serial World"
# Method 1: encode()
ser.write(message.encode('utf-8'))
# Method 2: bytes literal
ser.write(b'Hello Serial World')
# Method 3: format and encode
temperature = 25.6
data = f"TEMP:{temperature:.1f}\n"
ser.write(data.encode('utf-8'))
# Common encodings
ser.write(message.encode('ascii')) # ASCII only
ser.write(message.encode('latin-1')) # Extended ASCII
ser.write(message.encode('cp1252')) # Windows encoding
import struct
# Send structured binary data
# Pack integers
data = struct.pack('>HHH', 1234, 5678, 9012) # Big-endian shorts
ser.write(data)
# Pack floats
temperature = 25.6
data = struct.pack('<f', temperature) # Little-endian float
ser.write(data)
# Mixed data packet
packet = struct.pack('>BHf', 0xAA, 1234, 25.6) # Byte, Short, Float
ser.write(packet)
# Send byte array
data = bytearray([0xAA, 0x55, 0x01, 0x02, 0x03])
ser.write(data)
Advanced Write Techniques
Reliable Write with Confirmation
def write_with_confirm(ser, data, expected_response=b'OK', timeout=2):
"""Write data and wait for confirmation"""
# Clear input buffer
ser.reset_input_buffer()
# Send data
bytes_written = ser.write(data)
ser.flush() # Force transmission
# Wait for confirmation
start_time = time.time()
response = b''
while time.time() - start_time < timeout:
if ser.in_waiting:
chunk = ser.read(ser.in_waiting)
response += chunk
if expected_response in response:
return True, bytes_written, response
time.sleep(0.01)
return False, bytes_written, response
# Usage
success, written, response = write_with_confirm(
ser,
b'AT+GMR\r\n',
b'OK'
)
if success:
print(f"✅ Command sent successfully: {response.decode()}")
else:
print(f"❌ No confirmation received")
Chunked Writing for Large Data
def write_large_data(ser, data, chunk_size=1024):
"""Write large data in chunks"""
total_bytes = len(data)
bytes_written = 0
for i in range(0, total_bytes, chunk_size):
chunk = data[i:i + chunk_size]
written = ser.write(chunk)
bytes_written += written
# Optional: wait for buffer space
while ser.out_waiting > 512: # If output buffer supported
time.sleep(0.01)
# Progress indicator
progress = (bytes_written / total_bytes) * 100
print(f"\rSending: {progress:.1f}%", end="", flush=True)
ser.flush()
print(f"\n✅ Sent {bytes_written} bytes")
return bytes_written
# Send large file
with open('large_file.bin', 'rb') as f:
data = f.read()
write_large_data(ser, data)
writelines() Method
Send multiple byte strings efficiently.
# Prepare multiple lines
lines = [
b'COMMAND1\r\n',
b'COMMAND2\r\n',
b'COMMAND3\r\n'
]
# Send all at once
ser.writelines(lines)
# Equivalent to:
for line in lines:
ser.write(line)
Practical example:
def send_command_sequence(ser, commands):
"""Send sequence of AT commands"""
formatted_commands = []
for cmd in commands:
if isinstance(cmd, str):
cmd = cmd.encode('utf-8')
if not cmd.endswith(b'\r\n'):
cmd += b'\r\n'
formatted_commands.append(cmd)
# Send all commands
ser.writelines(formatted_commands)
ser.flush()
# Usage
commands = [
'ATE0', # Disable echo
'AT+CMEE=1', # Enable error reporting
'AT+CREG?', # Check registration
]
send_command_sequence(ser, commands)
Buffer Management
flush() Method
Important: flush()
forces transmission of buffered data. Always flush before expecting immediate responses.
# Send data and force immediate transmission
ser.write(b'URGENT_COMMAND\r\n')
ser.flush() # Don't wait for buffer to fill
# Read immediate response
response = ser.readline()
Write Timeout
# Configure write timeout
ser = serial.Serial(
'/dev/ttyUSB0',
9600,
timeout=1, # Read timeout
write_timeout=2 # Write timeout
)
try:
# This will timeout after 2 seconds if buffer is full
ser.write(large_data)
except serial.SerialTimeoutException:
print("Write operation timed out")
Check Output Buffer
# Monitor output buffer (if supported by platform)
def write_with_flow_control(ser, data, max_buffer=512):
"""Write data with flow control"""
bytes_written = 0
for i in range(0, len(data), max_buffer):
chunk = data[i:i + max_buffer]
# Wait for buffer space
while hasattr(ser, 'out_waiting') and ser.out_waiting > max_buffer:
time.sleep(0.01)
written = ser.write(chunk)
bytes_written += written
return bytes_written
Encoding Strategies
Safe Encoding Function
def safe_encode(text, encodings=['utf-8', 'ascii', 'latin-1']):
"""Try multiple encodings until one works"""
for encoding in encodings:
try:
return text.encode(encoding)
except UnicodeEncodeError:
continue
# Fallback: replace problematic characters
return text.encode('utf-8', errors='replace')
# Usage
text = "Temperature: 25°C" # Contains degree symbol
data = safe_encode(text)
ser.write(data)
Protocol-Specific Formatting
class ProtocolFormatter:
"""Format data for different protocols"""
@staticmethod
def nmea_sentence(sentence_type, data_fields):
"""Format NMEA sentence with checksum"""
# Join data fields
data_part = ','.join(str(field) for field in data_fields)
sentence = f"${sentence_type},{data_part}"
# Calculate checksum
checksum = 0
for char in sentence[1:]: # Skip $
checksum ^= ord(char)
sentence += f"*{checksum:02X}\r\n"
return sentence.encode('ascii')
@staticmethod
def json_message(data):
"""Format JSON message"""
import json
message = json.dumps(data) + '\n'
return message.encode('utf-8')
@staticmethod
def csv_line(values):
"""Format CSV line"""
import csv
from io import StringIO
output = StringIO()
writer = csv.writer(output)
writer.writerow(values)
line = output.getvalue()
return line.encode('utf-8')
# Usage examples
formatter = ProtocolFormatter()
# NMEA sentence
nmea_data = formatter.nmea_sentence('GPGGA', [
'123456.00', '4807.038', 'N', '01131.000', 'E', '1', '08', '0.9', '545.4', 'M'
])
ser.write(nmea_data)
# JSON message
json_data = formatter.json_message({
'temperature': 25.6,
'humidity': 60.2,
'timestamp': '2023-01-01T12:00:00Z'
})
ser.write(json_data)
Flow Control
Software Flow Control (XON/XOFF)
# Enable software flow control
ser = serial.Serial(
'/dev/ttyUSB0',
9600,
xonxoff=True # Enable XON/XOFF flow control
)
# PySerial handles XON/XOFF automatically
ser.write(large_data) # Will pause if device sends XOFF
Hardware Flow Control (RTS/CTS)
# Enable hardware flow control
ser = serial.Serial(
'/dev/ttyUSB0',
9600,
rtscts=True # Enable RTS/CTS flow control
)
# Manual RTS control
ser.rts = False # Request to Send OFF
time.sleep(0.1)
ser.rts = True # Request to Send ON
# Check CTS status
if ser.cts:
print("✅ Clear to send")
ser.write(data)
else:
print("⏸️ Waiting for clear to send")
Custom Flow Control
def write_with_ack(ser, data, ack_byte=b'\x06'):
"""Write data and wait for ACK byte"""
CHUNK_SIZE = 64
for i in range(0, len(data), CHUNK_SIZE):
chunk = data[i:i + CHUNK_SIZE]
# Send chunk
ser.write(chunk)
ser.flush()
# Wait for ACK
ack = ser.read(1)
if ack != ack_byte:
raise Exception(f"Expected ACK, got {ack.hex()}")
print(f"✅ Chunk {i//CHUNK_SIZE + 1} acknowledged")
print("✅ All data sent and acknowledged")
# Usage
try:
write_with_ack(ser, large_data)
except Exception as e:
print(f"❌ Transmission failed: {e}")
Error Handling
Robust Write Function
def robust_write(ser, data, max_retries=3):
"""Write data with error handling and retries"""
if isinstance(data, str):
data = data.encode('utf-8')
for attempt in range(max_retries):
try:
# Check if port is still open
if not ser.is_open:
ser.open()
# Write data
bytes_written = ser.write(data)
ser.flush()
if bytes_written == len(data):
return True, bytes_written
else:
print(f"⚠️ Partial write: {bytes_written}/{len(data)} bytes")
except serial.SerialTimeoutException:
print(f"⏱️ Write timeout on attempt {attempt + 1}")
except serial.SerialException as e:
print(f"❌ Serial error on attempt {attempt + 1}: {e}")
# Try to recover
try:
ser.close()
time.sleep(0.5)
ser.open()
except:
pass
# Wait before retry
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1)) # Increasing delay
return False, 0
# Usage
success, bytes_sent = robust_write(ser, "Important message\r\n")
if success:
print(f"✅ Successfully sent {bytes_sent} bytes")
else:
print("❌ Failed to send data after retries")
Performance Optimization
Buffered Writing
class BufferedWriter:
def __init__(self, ser, buffer_size=4096, flush_interval=0.1):
self.ser = ser
self.buffer = bytearray()
self.buffer_size = buffer_size
self.flush_interval = flush_interval
self.last_flush = time.time()
def write(self, data):
"""Add data to buffer"""
if isinstance(data, str):
data = data.encode('utf-8')
self.buffer.extend(data)
# Auto-flush if buffer full or time elapsed
if (len(self.buffer) >= self.buffer_size or
time.time() - self.last_flush > self.flush_interval):
self.flush()
def flush(self):
"""Flush buffer to serial port"""
if self.buffer:
bytes_written = self.ser.write(self.buffer)
self.ser.flush()
self.buffer.clear()
self.last_flush = time.time()
return bytes_written
return 0
def close(self):
"""Flush and close"""
self.flush()
# Usage
writer = BufferedWriter(ser, buffer_size=1024)
# Add data to buffer
for i in range(1000):
writer.write(f"Data line {i}\n")
# Ensure everything is sent
writer.close()
Send Break Signal
# Send break condition (for special protocols)
ser.send_break(duration=0.25) # 250ms break
# Custom break handling
def send_attention_sequence(ser):
"""Send attention sequence with break"""
ser.send_break(0.5) # Long break
time.sleep(0.1)
ser.write(b'+++') # Escape sequence
time.sleep(1)
ser.write(b'ATH\r\n') # Hang up command
Common Patterns
Command-Response Interface
def send_command(ser, command, expected_response="OK", timeout=2):
"""Send command and validate response"""
# Prepare command
if isinstance(command, str):
command = command.encode('utf-8')
if not command.endswith(b'\r\n'):
command += b'\r\n'
# Clear buffers
ser.reset_input_buffer()
ser.reset_output_buffer()
# Send command
ser.write(command)
ser.flush()
# Read response
start_time = time.time()
response = b''
while time.time() - start_time < timeout:
if ser.in_waiting:
chunk = ser.read(ser.in_waiting)
response += chunk
# Check for expected response
if expected_response.encode() in response:
return True, response.decode('utf-8', errors='ignore')
time.sleep(0.01)
return False, response.decode('utf-8', errors='ignore')
# Usage
success, response = send_command(ser, "AT+GMR", "OK")
if success:
print(f"✅ Command successful: {response}")
File Transfer Protocol
def send_file(ser, file_path, chunk_size=1024):
"""Send file with progress and error checking"""
file_size = os.path.getsize(file_path)
bytes_sent = 0
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
# Send chunk with size prefix
chunk_header = struct.pack('>H', len(chunk))
ser.write(chunk_header + chunk)
ser.flush()
# Wait for ACK
ack = ser.read(1)
if ack != b'\x06': # ACK byte
raise Exception("Transfer failed - no ACK")
bytes_sent += len(chunk)
progress = (bytes_sent / file_size) * 100
print(f"\rProgress: {progress:.1f}%", end="", flush=True)
# Send end marker
ser.write(struct.pack('>H', 0))
print(f"\n✅ File sent: {bytes_sent} bytes")
Next Steps
Reading Data
Learn all PySerial read methods
Configuration
Master port settings and parameters
Examples
See writing techniques in real projects
Performance
Optimize for high-speed communication
Writing Mastery: You now understand all PySerial write methods and can handle any data transmission scenario efficiently and reliably.
Proper writing techniques are essential for reliable serial communication. Always handle encoding properly, use appropriate timeouts, and implement error recovery for production applications.
How is this guide?