PySerial
PySerialDocs

Writing Serial Data with PySerial

Master PySerial write methods: write(), writelines(), flush(). Learn encoding, flow control, and reliable data transmission techniques.

Master all PySerial write methods and data transmission techniques.

Write Methods Overview

write()

Send bytes - most common method

writelines()

Send multiple byte strings

flush()

Force transmission of buffered data

send_break()

Send break condition signal

MethodInput TypeUse CaseReturns
write(data)bytesSend any dataint (bytes written)
writelines(lines)list of bytesSend multiple linesNone
flush()NoneForce buffer flushNone
send_break()NoneSend break signalNone

write() Method

The core method for sending data.

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Send bytes
bytes_written = ser.write(b'Hello World')
print(f"Sent {bytes_written} bytes")

# Send with line ending
ser.write(b'Command\r\n')

# Send single byte
ser.write(b'\x1A')  # Ctrl+Z
# Convert string to bytes (required!)
message = "Hello Serial World"

# Method 1: encode()
ser.write(message.encode('utf-8'))

# Method 2: bytes literal
ser.write(b'Hello Serial World')

# Method 3: format and encode
temperature = 25.6
data = f"TEMP:{temperature:.1f}\n"
ser.write(data.encode('utf-8'))

# Common encodings
ser.write(message.encode('ascii'))    # ASCII only
ser.write(message.encode('latin-1'))  # Extended ASCII
ser.write(message.encode('cp1252'))   # Windows encoding
import struct

# Send structured binary data

# Pack integers
data = struct.pack('>HHH', 1234, 5678, 9012)  # Big-endian shorts
ser.write(data)

# Pack floats
temperature = 25.6
data = struct.pack('<f', temperature)  # Little-endian float
ser.write(data)

# Mixed data packet
packet = struct.pack('>BHf', 0xAA, 1234, 25.6)  # Byte, Short, Float
ser.write(packet)

# Send byte array
data = bytearray([0xAA, 0x55, 0x01, 0x02, 0x03])
ser.write(data)

Advanced Write Techniques

Reliable Write with Confirmation

def write_with_confirm(ser, data, expected_response=b'OK', timeout=2):
    """Write data and wait for confirmation"""
    # Clear input buffer
    ser.reset_input_buffer()
    
    # Send data
    bytes_written = ser.write(data)
    ser.flush()  # Force transmission
    
    # Wait for confirmation
    start_time = time.time()
    response = b''
    
    while time.time() - start_time < timeout:
        if ser.in_waiting:
            chunk = ser.read(ser.in_waiting)
            response += chunk
            
            if expected_response in response:
                return True, bytes_written, response
        
        time.sleep(0.01)
    
    return False, bytes_written, response

# Usage
success, written, response = write_with_confirm(
    ser, 
    b'AT+GMR\r\n', 
    b'OK'
)

if success:
    print(f"✅ Command sent successfully: {response.decode()}")
else:
    print(f"❌ No confirmation received")

Chunked Writing for Large Data

def write_large_data(ser, data, chunk_size=1024):
    """Write large data in chunks"""
    total_bytes = len(data)
    bytes_written = 0
    
    for i in range(0, total_bytes, chunk_size):
        chunk = data[i:i + chunk_size]
        written = ser.write(chunk)
        bytes_written += written
        
        # Optional: wait for buffer space
        while ser.out_waiting > 512:  # If output buffer supported
            time.sleep(0.01)
        
        # Progress indicator
        progress = (bytes_written / total_bytes) * 100
        print(f"\rSending: {progress:.1f}%", end="", flush=True)
    
    ser.flush()
    print(f"\n✅ Sent {bytes_written} bytes")
    return bytes_written

# Send large file
with open('large_file.bin', 'rb') as f:
    data = f.read()
    write_large_data(ser, data)

writelines() Method

Send multiple byte strings efficiently.

# Prepare multiple lines
lines = [
    b'COMMAND1\r\n',
    b'COMMAND2\r\n', 
    b'COMMAND3\r\n'
]

# Send all at once
ser.writelines(lines)

# Equivalent to:
for line in lines:
    ser.write(line)

Practical example:

def send_command_sequence(ser, commands):
    """Send sequence of AT commands"""
    formatted_commands = []
    
    for cmd in commands:
        if isinstance(cmd, str):
            cmd = cmd.encode('utf-8')
        if not cmd.endswith(b'\r\n'):
            cmd += b'\r\n'
        formatted_commands.append(cmd)
    
    # Send all commands
    ser.writelines(formatted_commands)
    ser.flush()

# Usage
commands = [
    'ATE0',           # Disable echo
    'AT+CMEE=1',      # Enable error reporting
    'AT+CREG?',       # Check registration
]

send_command_sequence(ser, commands)

Buffer Management

flush() Method

Important: flush() forces transmission of buffered data. Always flush before expecting immediate responses.

# Send data and force immediate transmission
ser.write(b'URGENT_COMMAND\r\n')
ser.flush()  # Don't wait for buffer to fill

# Read immediate response
response = ser.readline()

Write Timeout

# Configure write timeout
ser = serial.Serial(
    '/dev/ttyUSB0', 
    9600, 
    timeout=1,        # Read timeout
    write_timeout=2   # Write timeout
)

try:
    # This will timeout after 2 seconds if buffer is full
    ser.write(large_data)
except serial.SerialTimeoutException:
    print("Write operation timed out")

Check Output Buffer

# Monitor output buffer (if supported by platform)
def write_with_flow_control(ser, data, max_buffer=512):
    """Write data with flow control"""
    bytes_written = 0
    
    for i in range(0, len(data), max_buffer):
        chunk = data[i:i + max_buffer]
        
        # Wait for buffer space
        while hasattr(ser, 'out_waiting') and ser.out_waiting > max_buffer:
            time.sleep(0.01)
        
        written = ser.write(chunk)
        bytes_written += written
    
    return bytes_written

Encoding Strategies

Safe Encoding Function

def safe_encode(text, encodings=['utf-8', 'ascii', 'latin-1']):
    """Try multiple encodings until one works"""
    for encoding in encodings:
        try:
            return text.encode(encoding)
        except UnicodeEncodeError:
            continue
    
    # Fallback: replace problematic characters
    return text.encode('utf-8', errors='replace')

# Usage
text = "Temperature: 25°C"  # Contains degree symbol
data = safe_encode(text)
ser.write(data)

Protocol-Specific Formatting

class ProtocolFormatter:
    """Format data for different protocols"""
    
    @staticmethod
    def nmea_sentence(sentence_type, data_fields):
        """Format NMEA sentence with checksum"""
        # Join data fields
        data_part = ','.join(str(field) for field in data_fields)
        sentence = f"${sentence_type},{data_part}"
        
        # Calculate checksum
        checksum = 0
        for char in sentence[1:]:  # Skip $
            checksum ^= ord(char)
        
        sentence += f"*{checksum:02X}\r\n"
        return sentence.encode('ascii')
    
    @staticmethod
    def json_message(data):
        """Format JSON message"""
        import json
        message = json.dumps(data) + '\n'
        return message.encode('utf-8')
    
    @staticmethod
    def csv_line(values):
        """Format CSV line"""
        import csv
        from io import StringIO
        
        output = StringIO()
        writer = csv.writer(output)
        writer.writerow(values)
        line = output.getvalue()
        
        return line.encode('utf-8')

# Usage examples
formatter = ProtocolFormatter()

# NMEA sentence
nmea_data = formatter.nmea_sentence('GPGGA', [
    '123456.00', '4807.038', 'N', '01131.000', 'E', '1', '08', '0.9', '545.4', 'M'
])
ser.write(nmea_data)

# JSON message
json_data = formatter.json_message({
    'temperature': 25.6,
    'humidity': 60.2,
    'timestamp': '2023-01-01T12:00:00Z'
})
ser.write(json_data)

Flow Control

Software Flow Control (XON/XOFF)

# Enable software flow control
ser = serial.Serial(
    '/dev/ttyUSB0',
    9600,
    xonxoff=True  # Enable XON/XOFF flow control
)

# PySerial handles XON/XOFF automatically
ser.write(large_data)  # Will pause if device sends XOFF

Hardware Flow Control (RTS/CTS)

# Enable hardware flow control
ser = serial.Serial(
    '/dev/ttyUSB0',
    9600,
    rtscts=True  # Enable RTS/CTS flow control
)

# Manual RTS control
ser.rts = False  # Request to Send OFF
time.sleep(0.1)
ser.rts = True   # Request to Send ON

# Check CTS status
if ser.cts:
    print("✅ Clear to send")
    ser.write(data)
else:
    print("⏸️  Waiting for clear to send")

Custom Flow Control

def write_with_ack(ser, data, ack_byte=b'\x06'):
    """Write data and wait for ACK byte"""
    CHUNK_SIZE = 64
    
    for i in range(0, len(data), CHUNK_SIZE):
        chunk = data[i:i + CHUNK_SIZE]
        
        # Send chunk
        ser.write(chunk)
        ser.flush()
        
        # Wait for ACK
        ack = ser.read(1)
        if ack != ack_byte:
            raise Exception(f"Expected ACK, got {ack.hex()}")
        
        print(f"✅ Chunk {i//CHUNK_SIZE + 1} acknowledged")
    
    print("✅ All data sent and acknowledged")

# Usage
try:
    write_with_ack(ser, large_data)
except Exception as e:
    print(f"❌ Transmission failed: {e}")

Error Handling

Robust Write Function

def robust_write(ser, data, max_retries=3):
    """Write data with error handling and retries"""
    
    if isinstance(data, str):
        data = data.encode('utf-8')
    
    for attempt in range(max_retries):
        try:
            # Check if port is still open
            if not ser.is_open:
                ser.open()
            
            # Write data
            bytes_written = ser.write(data)
            ser.flush()
            
            if bytes_written == len(data):
                return True, bytes_written
            else:
                print(f"⚠️  Partial write: {bytes_written}/{len(data)} bytes")
                
        except serial.SerialTimeoutException:
            print(f"⏱️  Write timeout on attempt {attempt + 1}")
            
        except serial.SerialException as e:
            print(f"❌ Serial error on attempt {attempt + 1}: {e}")
            
            # Try to recover
            try:
                ser.close()
                time.sleep(0.5)
                ser.open()
            except:
                pass
        
        # Wait before retry
        if attempt < max_retries - 1:
            time.sleep(0.1 * (attempt + 1))  # Increasing delay
    
    return False, 0

# Usage
success, bytes_sent = robust_write(ser, "Important message\r\n")
if success:
    print(f"✅ Successfully sent {bytes_sent} bytes")
else:
    print("❌ Failed to send data after retries")

Performance Optimization

Buffered Writing

class BufferedWriter:
    def __init__(self, ser, buffer_size=4096, flush_interval=0.1):
        self.ser = ser
        self.buffer = bytearray()
        self.buffer_size = buffer_size
        self.flush_interval = flush_interval
        self.last_flush = time.time()
    
    def write(self, data):
        """Add data to buffer"""
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        self.buffer.extend(data)
        
        # Auto-flush if buffer full or time elapsed
        if (len(self.buffer) >= self.buffer_size or 
            time.time() - self.last_flush > self.flush_interval):
            self.flush()
    
    def flush(self):
        """Flush buffer to serial port"""
        if self.buffer:
            bytes_written = self.ser.write(self.buffer)
            self.ser.flush()
            self.buffer.clear()
            self.last_flush = time.time()
            return bytes_written
        return 0
    
    def close(self):
        """Flush and close"""
        self.flush()

# Usage
writer = BufferedWriter(ser, buffer_size=1024)

# Add data to buffer
for i in range(1000):
    writer.write(f"Data line {i}\n")

# Ensure everything is sent
writer.close()

Send Break Signal

# Send break condition (for special protocols)
ser.send_break(duration=0.25)  # 250ms break

# Custom break handling
def send_attention_sequence(ser):
    """Send attention sequence with break"""
    ser.send_break(0.5)     # Long break
    time.sleep(0.1)
    ser.write(b'+++')       # Escape sequence  
    time.sleep(1)
    ser.write(b'ATH\r\n')   # Hang up command

Common Patterns

Command-Response Interface

def send_command(ser, command, expected_response="OK", timeout=2):
    """Send command and validate response"""
    
    # Prepare command
    if isinstance(command, str):
        command = command.encode('utf-8')
    if not command.endswith(b'\r\n'):
        command += b'\r\n'
    
    # Clear buffers
    ser.reset_input_buffer()
    ser.reset_output_buffer()
    
    # Send command
    ser.write(command)
    ser.flush()
    
    # Read response
    start_time = time.time()
    response = b''
    
    while time.time() - start_time < timeout:
        if ser.in_waiting:
            chunk = ser.read(ser.in_waiting)
            response += chunk
            
            # Check for expected response
            if expected_response.encode() in response:
                return True, response.decode('utf-8', errors='ignore')
        
        time.sleep(0.01)
    
    return False, response.decode('utf-8', errors='ignore')

# Usage
success, response = send_command(ser, "AT+GMR", "OK")
if success:
    print(f"✅ Command successful: {response}")

File Transfer Protocol

def send_file(ser, file_path, chunk_size=1024):
    """Send file with progress and error checking"""
    
    file_size = os.path.getsize(file_path)
    bytes_sent = 0
    
    with open(file_path, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            
            # Send chunk with size prefix
            chunk_header = struct.pack('>H', len(chunk))
            ser.write(chunk_header + chunk)
            ser.flush()
            
            # Wait for ACK
            ack = ser.read(1)
            if ack != b'\x06':  # ACK byte
                raise Exception("Transfer failed - no ACK")
            
            bytes_sent += len(chunk)
            progress = (bytes_sent / file_size) * 100
            print(f"\rProgress: {progress:.1f}%", end="", flush=True)
    
    # Send end marker
    ser.write(struct.pack('>H', 0))
    print(f"\n✅ File sent: {bytes_sent} bytes")

Next Steps

Writing Mastery: You now understand all PySerial write methods and can handle any data transmission scenario efficiently and reliably.

Proper writing techniques are essential for reliable serial communication. Always handle encoding properly, use appropriate timeouts, and implement error recovery for production applications.

How is this guide?