PySerial
PySerialDocs

Writing Data

PySerial write methods: write(), writelines(), flush(). String encoding, binary packing, flow control, and error handling.

PySerial sends data as bytes. Every write() call requires a bytes or bytearray object.

MethodInputUse CaseReturns
write(data)bytesSend any dataint (bytes written)
writelines(lines)list of bytesSend multiple chunksNone
flush()NoneForce buffer transmissionNone
send_break()NoneSend break conditionNone

write()

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Send bytes directly
bytes_written = ser.write(b'Hello World')
print(f"Sent {bytes_written} bytes")

# Send with line ending
ser.write(b'AT+GMR\r\n')

# Send a single control character
ser.write(b'\x1A')  # Ctrl+Z

String Encoding

Strings must be encoded before writing. utf-8 is the safe default.

message = "Hello Serial World"

# encode() is the standard approach
ser.write(message.encode('utf-8'))

# Format and encode
temperature = 25.6
ser.write(f"TEMP:{temperature:.1f}\n".encode('utf-8'))

# Other encodings when the device requires them
ser.write(message.encode('ascii'))
ser.write(message.encode('latin-1'))

Binary Data

Use struct.pack() to build binary frames.

import struct

# Big-endian unsigned shorts
ser.write(struct.pack('>HHH', 1234, 5678, 9012))

# Little-endian float
ser.write(struct.pack('<f', 25.6))

# Mixed: byte + short + float
ser.write(struct.pack('>BHf', 0xAA, 1234, 25.6))

# Raw byte array
ser.write(bytearray([0xAA, 0x55, 0x01, 0x02, 0x03]))

Log serial data automatically

TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.

Write with Confirmation

Send data, then wait for the device to acknowledge.

import time

def write_with_confirm(ser, data, expected=b'OK', timeout=2):
    """Write data and wait for a confirmation response."""
    ser.reset_input_buffer()
    ser.write(data)
    ser.flush()

    response = b''
    deadline = time.time() + timeout
    while time.time() < deadline:
        if ser.in_waiting:
            response += ser.read(ser.in_waiting)
            if expected in response:
                return True, response
        time.sleep(0.01)
    return False, response

ok, resp = write_with_confirm(ser, b'AT+GMR\r\n', b'OK')
print(f"Confirmed: {ok}")

Chunked Writing

For large payloads, send in chunks to avoid overwhelming the device.

import time

def write_chunked(ser, data, chunk_size=1024):
    """Write large data in chunks, waiting for output buffer to drain."""
    total = len(data)
    sent = 0
    for i in range(0, total, chunk_size):
        chunk = data[i:i + chunk_size]
        sent += ser.write(chunk)
        while ser.out_waiting > 512:
            time.sleep(0.01)
    ser.flush()
    return sent

with open('firmware.bin', 'rb') as f:
    write_chunked(ser, f.read())

writelines()

Sends multiple byte strings in sequence.

lines = [
    b'ATE0\r\n',
    b'AT+CMEE=1\r\n',
    b'AT+CREG?\r\n',
]
ser.writelines(lines)
ser.flush()

flush()

Forces immediate transmission of anything sitting in the output buffer. Call it before expecting a response.

ser.write(b'URGENT_COMMAND\r\n')
ser.flush()
response = ser.readline()

Write Timeout

Set write_timeout to avoid blocking forever when the output buffer is full.

import serial

ser = serial.Serial(
    '/dev/ttyUSB0',
    9600,
    timeout=1,
    write_timeout=2
)

try:
    ser.write(large_data)
except serial.SerialTimeoutException:
    print("Write timed out")

Flow Control

Software (XON/XOFF)

ser = serial.Serial('/dev/ttyUSB0', 9600, xonxoff=True)
ser.write(large_data)  # pauses automatically if device sends XOFF

Hardware (RTS/CTS)

ser = serial.Serial('/dev/ttyUSB0', 9600, rtscts=True)

# Manual RTS control
ser.rts = True
if ser.cts:
    ser.write(data)

ACK-based

def write_with_ack(ser, data, chunk_size=64, ack=b'\x06'):
    """Write in chunks, waiting for ACK after each one."""
    for i in range(0, len(data), chunk_size):
        ser.write(data[i:i + chunk_size])
        ser.flush()
        response = ser.read(1)
        if response != ack:
            raise RuntimeError(f"Expected ACK, got {response.hex()}")

Error Handling

import serial
import time

def robust_write(ser, data, max_retries=3):
    """Write with retries and automatic reconnection."""
    if isinstance(data, str):
        data = data.encode('utf-8')

    for attempt in range(max_retries):
        try:
            if not ser.is_open:
                ser.open()
            written = ser.write(data)
            ser.flush()
            if written == len(data):
                return True, written
        except serial.SerialTimeoutException:
            print(f"Write timeout (attempt {attempt + 1})")
        except serial.SerialException as e:
            print(f"Serial error (attempt {attempt + 1}): {e}")
            try:
                ser.close()
                time.sleep(0.5)
                ser.open()
            except Exception:
                pass
        if attempt < max_retries - 1:
            time.sleep(0.1 * (attempt + 1))
    return False, 0

Send Break Signal

Some protocols use the UART break condition as an attention signal.

ser.send_break(duration=0.25)  # 250ms break