PySerial
PySerialDocs

Buffer Management & Memory Optimization

Advanced buffer management techniques for PySerial - circular buffers, memory pools, and zero-copy operations for high-performance applications

Optimize memory usage and prevent buffer overflows with advanced buffer management techniques for high-throughput serial applications.

Circular Buffer Implementation

Circular buffers provide constant-time operations and prevent memory fragmentation for continuous data streams.

import threading
import time
from typing import Optional

class CircularBuffer:
    def __init__(self, size: int):
        self.buffer = bytearray(size)
        self.size = size
        self.head = 0  # Write position
        self.tail = 0  # Read position
        self.count = 0  # Number of bytes in buffer
        self.lock = threading.RLock()
        
    def write(self, data: bytes) -> int:
        """Write data to buffer, returns bytes written"""
        with self.lock:
            if isinstance(data, str):
                data = data.encode()
                
            bytes_written = 0
            for byte in data:
                if self.count < self.size:
                    self.buffer[self.head] = byte
                    self.head = (self.head + 1) % self.size
                    self.count += 1
                    bytes_written += 1
                else:
                    break  # Buffer full
                    
            return bytes_written
            
    def read(self, max_bytes: int = None) -> bytes:
        """Read data from buffer"""
        with self.lock:
            if max_bytes is None:
                max_bytes = self.count
            else:
                max_bytes = min(max_bytes, self.count)
                
            if max_bytes == 0:
                return b''
                
            result = bytearray()
            for _ in range(max_bytes):
                result.append(self.buffer[self.tail])
                self.tail = (self.tail + 1) % self.size
                self.count -= 1
                
            return bytes(result)
            
    def peek(self, max_bytes: int = None) -> bytes:
        """Read data without removing from buffer"""
        with self.lock:
            if max_bytes is None:
                max_bytes = self.count
            else:
                max_bytes = min(max_bytes, self.count)
                
            if max_bytes == 0:
                return b''
                
            result = bytearray()
            temp_tail = self.tail
            for _ in range(max_bytes):
                result.append(self.buffer[temp_tail])
                temp_tail = (temp_tail + 1) % self.size
                
            return bytes(result)
            
    def readline(self, delimiter: bytes = b'\n') -> Optional[bytes]:
        """Read until delimiter found"""
        with self.lock:
            # Find delimiter
            temp_tail = self.tail
            line_length = 0
            found_delimiter = False
            
            for _ in range(self.count):
                if self.buffer[temp_tail:temp_tail+len(delimiter)] == delimiter:
                    found_delimiter = True
                    line_length += len(delimiter)
                    break
                temp_tail = (temp_tail + 1) % self.size
                line_length += 1
                
            if not found_delimiter:
                return None  # Delimiter not found
                
            # Read the line including delimiter
            return self.read(line_length)
            
    def available_space(self) -> int:
        """Get available write space"""
        with self.lock:
            return self.size - self.count
            
    def available_data(self) -> int:
        """Get available read data"""
        with self.lock:
            return self.count
            
    def is_full(self) -> bool:
        with self.lock:
            return self.count == self.size
            
    def is_empty(self) -> bool:
        with self.lock:
            return self.count == 0
            
    def clear(self):
        """Clear all data"""
        with self.lock:
            self.head = 0
            self.tail = 0
            self.count = 0

# Buffered Serial Manager
import serial

class BufferedSerial:
    def __init__(self, port, baudrate=9600, buffer_size=64*1024):
        self.serial = serial.Serial(port, baudrate)
        self.rx_buffer = CircularBuffer(buffer_size)
        self.tx_buffer = CircularBuffer(buffer_size)
        self.running = False
        
    def start(self):
        """Start background I/O threads"""
        self.running = True
        self.rx_thread = threading.Thread(target=self._rx_worker)
        self.tx_thread = threading.Thread(target=self._tx_worker)
        self.rx_thread.daemon = True
        self.tx_thread.daemon = True
        self.rx_thread.start()
        self.tx_thread.start()
        
    def _rx_worker(self):
        """Background receive worker"""
        while self.running:
            try:
                if self.serial.in_waiting and not self.rx_buffer.is_full():
                    # Read available data
                    max_read = min(
                        self.serial.in_waiting,
                        self.rx_buffer.available_space()
                    )
                    data = self.serial.read(max_read)
                    self.rx_buffer.write(data)
                time.sleep(0.001)
            except serial.SerialException:
                break
                
    def _tx_worker(self):
        """Background transmit worker"""
        while self.running:
            try:
                if not self.tx_buffer.is_empty():
                    # Send available data
                    data = self.tx_buffer.read(1024)  # Send in chunks
                    self.serial.write(data)
                time.sleep(0.001)
            except serial.SerialException:
                break
                
    def write(self, data: bytes) -> int:
        """Write data to transmit buffer"""
        return self.tx_buffer.write(data)
        
    def read(self, max_bytes: int = None) -> bytes:
        """Read data from receive buffer"""
        return self.rx_buffer.read(max_bytes)
        
    def readline(self, delimiter: bytes = b'\n') -> Optional[bytes]:
        """Read line from receive buffer"""
        return self.rx_buffer.readline(delimiter)
        
    def in_waiting(self) -> int:
        """Get bytes available for reading"""
        return self.rx_buffer.available_data()
        
    def out_waiting(self) -> int:
        """Get bytes waiting to be transmitted"""
        return self.tx_buffer.available_data()
        
    def stop(self):
        """Stop background threads and close serial"""
        self.running = False
        if hasattr(self, 'rx_thread'):
            self.rx_thread.join()
        if hasattr(self, 'tx_thread'):
            self.tx_thread.join()
        self.serial.close()

Memory Pool Management

Pre-allocate memory pools to avoid garbage collection overhead in high-frequency operations.

import threading
from typing import List, Optional
from collections import deque

class MemoryPool:
    def __init__(self, block_size: int, pool_size: int = 100):
        self.block_size = block_size
        self.available = deque()
        self.in_use = set()
        self.lock = threading.Lock()
        
        # Pre-allocate memory blocks
        for _ in range(pool_size):
            block = bytearray(block_size)
            self.available.append(block)
            
    def get_block(self) -> Optional[bytearray]:
        """Get a memory block from pool"""
        with self.lock:
            if self.available:
                block = self.available.popleft()
                self.in_use.add(id(block))
                # Clear the block
                block[:] = b'\x00' * len(block)
                return block
            return None  # Pool exhausted
            
    def return_block(self, block: bytearray):
        """Return block to pool"""
        with self.lock:
            block_id = id(block)
            if block_id in self.in_use:
                self.in_use.remove(block_id)
                self.available.append(block)
                
    def stats(self) -> dict:
        """Get pool statistics"""
        with self.lock:
            return {
                'total_blocks': len(self.available) + len(self.in_use),
                'available': len(self.available),
                'in_use': len(self.in_use),
                'block_size': self.block_size
            }

class PooledSerialBuffer:
    def __init__(self, port, baudrate=9600, block_size=4096):
        self.serial = serial.Serial(port, baudrate)
        self.pool = MemoryPool(block_size, 50)
        self.rx_blocks = deque()
        self.current_rx_block = None
        self.current_rx_pos = 0
        self.lock = threading.Lock()
        self.running = False
        
    def start(self):
        self.running = True
        self.rx_thread = threading.Thread(target=self._rx_worker)
        self.rx_thread.daemon = True
        self.rx_thread.start()
        
    def _rx_worker(self):
        while self.running:
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    self._buffer_data(data)
                time.sleep(0.001)
            except serial.SerialException:
                break
                
    def _buffer_data(self, data: bytes):
        """Buffer incoming data using memory pool"""
        with self.lock:
            data_pos = 0
            
            while data_pos < len(data):
                # Get current block or create new one
                if not self.current_rx_block or self.current_rx_pos >= len(self.current_rx_block):
                    self.current_rx_block = self.pool.get_block()
                    if not self.current_rx_block:
                        # Pool exhausted, allocate new block
                        self.current_rx_block = bytearray(self.pool.block_size)
                    self.current_rx_pos = 0
                    
                # Copy data to current block
                remaining_in_block = len(self.current_rx_block) - self.current_rx_pos
                copy_size = min(remaining_in_block, len(data) - data_pos)
                
                end_pos = self.current_rx_pos + copy_size
                self.current_rx_block[self.current_rx_pos:end_pos] = data[data_pos:data_pos + copy_size]
                
                self.current_rx_pos += copy_size
                data_pos += copy_size
                
                # If block is full, add to queue
                if self.current_rx_pos >= len(self.current_rx_block):
                    self.rx_blocks.append((self.current_rx_block, self.current_rx_pos))
                    self.current_rx_block = None
                    
    def read_data(self, max_bytes: int = None) -> bytes:
        """Read buffered data"""
        with self.lock:
            result = bytearray()
            bytes_read = 0
            
            while self.rx_blocks and (max_bytes is None or bytes_read < max_bytes):
                block, used_size = self.rx_blocks[0]
                
                # Calculate how much to read from this block
                available = used_size
                if max_bytes is not None:
                    available = min(available, max_bytes - bytes_read)
                    
                # Copy data
                result.extend(block[:available])
                bytes_read += available
                
                # Update or remove block
                if available >= used_size:
                    # Consumed entire block
                    self.rx_blocks.popleft()
                    self.pool.return_block(block)
                else:
                    # Partial consumption - shift data
                    block[:used_size - available] = block[available:used_size]
                    self.rx_blocks[0] = (block, used_size - available)
                    
            return bytes(result)
            
    def stop(self):
        self.running = False
        if hasattr(self, 'rx_thread'):
            self.rx_thread.join()
            
        # Return all blocks to pool
        with self.lock:
            while self.rx_blocks:
                block, _ = self.rx_blocks.popleft()
                self.pool.return_block(block)
                
            if self.current_rx_block:
                self.pool.return_block(self.current_rx_block)
                
        self.serial.close()
class BufferPool:
    """Pool of pre-allocated buffers for different sizes"""
    
    def __init__(self):
        self.pools = {
            1024: MemoryPool(1024, 20),      # Small messages
            4096: MemoryPool(4096, 10),      # Medium messages  
            16384: MemoryPool(16384, 5),     # Large messages
            65536: MemoryPool(65536, 2)      # Very large messages
        }
        self.lock = threading.Lock()
        
    def get_buffer(self, min_size: int) -> Optional[bytearray]:
        """Get smallest suitable buffer"""
        with self.lock:
            for size in sorted(self.pools.keys()):
                if size >= min_size:
                    return self.pools[size].get_block()
            return None  # No suitable buffer
            
    def return_buffer(self, buffer: bytearray):
        """Return buffer to appropriate pool"""
        with self.lock:
            buffer_size = len(buffer)
            if buffer_size in self.pools:
                self.pools[buffer_size].return_block(buffer)
                
    def get_stats(self) -> dict:
        """Get statistics for all pools"""
        with self.lock:
            stats = {}
            for size, pool in self.pools.items():
                stats[f'pool_{size}'] = pool.stats()
            return stats

# Usage with automatic buffer sizing
class AdaptiveBufferedSerial:
    def __init__(self, port, baudrate=9600):
        self.serial = serial.Serial(port, baudrate)
        self.buffer_pool = BufferPool()
        self.message_sizes = deque(maxlen=100)  # Track message sizes
        self.running = False
        
    def _predict_buffer_size(self) -> int:
        """Predict needed buffer size based on history"""
        if not self.message_sizes:
            return 1024
            
        avg_size = sum(self.message_sizes) / len(self.message_sizes)
        max_size = max(self.message_sizes)
        
        # Use 150% of average or max, whichever is larger
        predicted = max(int(avg_size * 1.5), max_size)
        
        # Round up to next pool size
        for size in sorted(self.buffer_pool.pools.keys()):
            if size >= predicted:
                return size
        return 65536  # Largest available
        
    def read_message(self, delimiter: bytes = b'\n') -> Optional[bytes]:
        """Read complete message with optimal buffer allocation"""
        buffer_size = self._predict_buffer_size()
        buffer = self.buffer_pool.get_buffer(buffer_size)
        
        if not buffer:
            # Fallback to regular allocation
            buffer = bytearray(buffer_size)
            
        try:
            # Read message into buffer
            pos = 0
            while pos < len(buffer):
                if self.serial.in_waiting:
                    byte = self.serial.read(1)
                    buffer[pos] = byte[0]
                    pos += 1
                    
                    if buffer[pos-len(delimiter):pos] == delimiter:
                        # Found complete message
                        message = bytes(buffer[:pos])
                        self.message_sizes.append(pos)
                        return message
                        
        finally:
            # Return buffer to pool
            if hasattr(buffer, '__len__') and len(buffer) in self.buffer_pool.pools:
                self.buffer_pool.return_buffer(buffer)
                
        return None
import struct
from dataclasses import dataclass
from typing import Any, Dict

@dataclass
class Message:
    msg_id: int
    timestamp: float
    data: bytes
    metadata: Dict[str, Any] = None
    
    def serialize(self) -> bytes:
        """Serialize message to bytes"""
        metadata_bytes = str(self.metadata or {}).encode()
        
        return struct.pack(
            '>IQI',  # Big-endian: uint32, uint64, uint32
            self.msg_id,
            int(self.timestamp * 1000000),  # Microseconds
            len(self.data)
        ) + self.data + struct.pack('>I', len(metadata_bytes)) + metadata_bytes
        
    @classmethod
    def deserialize(cls, data: bytes) -> 'Message':
        """Deserialize message from bytes"""
        if len(data) < 16:  # Minimum size
            raise ValueError("Invalid message data")
            
        # Unpack header
        msg_id, timestamp_us, data_len = struct.unpack('>IQI', data[:16])
        timestamp = timestamp_us / 1000000.0
        
        # Extract message data
        msg_data = data[16:16 + data_len]
        
        # Extract metadata
        metadata_len = struct.unpack('>I', data[16 + data_len:20 + data_len])[0]
        metadata_bytes = data[20 + data_len:20 + data_len + metadata_len]
        metadata = eval(metadata_bytes.decode()) if metadata_bytes else {}
        
        return cls(msg_id, timestamp, msg_data, metadata)

class MessagePool:
    def __init__(self, pool_size: int = 1000):
        self.available = deque()
        self.in_use = set()
        self.lock = threading.Lock()
        
        # Pre-allocate message objects
        for i in range(pool_size):
            msg = Message(0, 0.0, b'', {})
            self.available.append(msg)
            
    def get_message(self) -> Optional[Message]:
        """Get message from pool"""
        with self.lock:
            if self.available:
                msg = self.available.popleft()
                self.in_use.add(id(msg))
                # Reset message
                msg.msg_id = 0
                msg.timestamp = 0.0
                msg.data = b''
                msg.metadata = {}
                return msg
            return None
            
    def return_message(self, msg: Message):
        """Return message to pool"""
        with self.lock:
            msg_id = id(msg)
            if msg_id in self.in_use:
                self.in_use.remove(msg_id)
                self.available.append(msg)

# High-performance message processor
class MessageProcessor:
    def __init__(self, port, baudrate=9600):
        self.serial = serial.Serial(port, baudrate)
        self.msg_pool = MessagePool(200)
        self.buffer_pool = BufferPool()
        self.processed_messages = deque(maxlen=1000)
        self.stats = {
            'messages_processed': 0,
            'bytes_processed': 0,
            'pool_hits': 0,
            'pool_misses': 0
        }
        
    def process_stream(self, callback=None):
        """Process incoming message stream"""
        buffer = self.buffer_pool.get_buffer(4096)
        if buffer:
            self.stats['pool_hits'] += 1
        else:
            buffer = bytearray(4096)
            self.stats['pool_misses'] += 1
            
        try:
            pos = 0
            while True:
                if self.serial.in_waiting:
                    # Read available data
                    available = min(self.serial.in_waiting, len(buffer) - pos)
                    data = self.serial.read(available)
                    buffer[pos:pos + available] = data
                    pos += available
                    
                    # Process complete messages
                    while pos >= 16:  # Minimum message size
                        try:
                            # Try to parse message
                            msg_data = bytes(buffer[:pos])
                            message = Message.deserialize(msg_data)
                            
                            # Get message object from pool
                            pooled_msg = self.msg_pool.get_message()
                            if pooled_msg:
                                pooled_msg.msg_id = message.msg_id
                                pooled_msg.timestamp = message.timestamp
                                pooled_msg.data = message.data
                                pooled_msg.metadata = message.metadata
                                message = pooled_msg
                                
                            # Process message
                            if callback:
                                callback(message)
                                
                            self.processed_messages.append(message)
                            self.stats['messages_processed'] += 1
                            self.stats['bytes_processed'] += len(message.data)
                            
                            # Remove processed data from buffer
                            message_size = len(msg_data)
                            buffer[:pos - message_size] = buffer[message_size:pos]
                            pos -= message_size
                            
                        except (ValueError, struct.error):
                            # Not a complete message yet
                            break
                            
                    # If buffer is getting full, expand it
                    if pos > len(buffer) * 0.8:
                        new_buffer = self.buffer_pool.get_buffer(len(buffer) * 2)
                        if new_buffer:
                            new_buffer[:pos] = buffer[:pos]
                            self.buffer_pool.return_buffer(buffer)
                            buffer = new_buffer
                            
        finally:
            # Cleanup
            if hasattr(buffer, '__len__') and len(buffer) in self.buffer_pool.pools:
                self.buffer_pool.return_buffer(buffer)

Zero-Copy Operations

Zero-copy techniques can significantly improve performance but require careful memory management.

import mmap
import os
from typing import Iterator

class ZeroCopyFileBuffer:
    """Memory-mapped file buffer for zero-copy operations"""
    
    def __init__(self, filename: str, size: int = 1024*1024):
        self.filename = filename
        self.size = size
        self.file = None
        self.mmap = None
        self.write_pos = 0
        self.read_pos = 0
        
    def __enter__(self):
        # Create file if it doesn't exist
        if not os.path.exists(self.filename):
            with open(self.filename, 'wb') as f:
                f.write(b'\x00' * self.size)
                
        self.file = open(self.filename, 'r+b')
        self.mmap = mmap.mmap(self.file.fileno(), 0)
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.mmap:
            self.mmap.close()
        if self.file:
            self.file.close()
            
    def write_data(self, data: bytes) -> int:
        """Write data to memory-mapped buffer"""
        if self.write_pos + len(data) > self.size:
            # Wrap around (circular buffer behavior)
            available = self.size - self.write_pos
            self.mmap[self.write_pos:] = data[:available]
            remaining = data[available:]
            if remaining:
                self.mmap[:len(remaining)] = remaining
                self.write_pos = len(remaining)
            else:
                self.write_pos = 0
        else:
            self.mmap[self.write_pos:self.write_pos + len(data)] = data
            self.write_pos += len(data)
            
        return len(data)
        
    def get_view(self, size: int) -> memoryview:
        """Get zero-copy view of data"""
        if self.read_pos + size <= self.size:
            view = memoryview(self.mmap)[self.read_pos:self.read_pos + size]
            self.read_pos += size
        else:
            # Handle wrap-around
            part1_size = self.size - self.read_pos
            part1 = memoryview(self.mmap)[self.read_pos:]
            part2 = memoryview(self.mmap)[:size - part1_size]
            # Note: This creates a copy for wrap-around case
            combined = bytearray(part1) + bytearray(part2)
            view = memoryview(combined)
            self.read_pos = size - part1_size
            
        return view

class ZeroCopySerial:
    """Serial interface with zero-copy buffer operations"""
    
    def __init__(self, port, baudrate=9600, buffer_file='/tmp/serial_buffer.dat'):
        self.serial = serial.Serial(port, baudrate)
        self.buffer_file = buffer_file
        self.running = False
        
    def start_logging(self, callback=None):
        """Start zero-copy logging to memory-mapped file"""
        self.running = True
        
        def log_worker():
            with ZeroCopyFileBuffer(self.buffer_file, 10*1024*1024) as buffer:
                while self.running:
                    if self.serial.in_waiting:
                        data = self.serial.read(self.serial.in_waiting)
                        buffer.write_data(data)
                        
                        if callback:
                            # Provide zero-copy view to callback
                            view = memoryview(data)
                            callback(view)
                            
                    time.sleep(0.001)
                    
        self.log_thread = threading.Thread(target=log_worker)
        self.log_thread.daemon = True
        self.log_thread.start()
        
    def read_chunks(self, chunk_size: int = 4096) -> Iterator[memoryview]:
        """Read data in chunks using zero-copy views"""
        with ZeroCopyFileBuffer(self.buffer_file) as buffer:
            while True:
                try:
                    chunk = buffer.get_view(chunk_size)
                    if len(chunk) == 0:
                        break
                    yield chunk
                except:
                    break
                    
    def stop(self):
        self.running = False
        if hasattr(self, 'log_thread'):
            self.log_thread.join()
        self.serial.close()

Buffer Monitoring & Statistics

Monitor Buffer Health

import psutil
import threading
import time
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class BufferStats:
    name: str
    size: int
    used: int
    peak_usage: int
    overflow_count: int
    underrun_count: int
    throughput_bps: int

class BufferMonitor:
    def __init__(self):
        self.buffers: Dict[str, CircularBuffer] = {}
        self.stats: Dict[str, BufferStats] = {}
        self.monitoring = False
        self.monitor_thread = None
        
    def register_buffer(self, name: str, buffer: CircularBuffer):
        """Register a buffer for monitoring"""
        self.buffers[name] = buffer
        self.stats[name] = BufferStats(
            name=name,
            size=buffer.size,
            used=0,
            peak_usage=0,
            overflow_count=0,
            underrun_count=0,
            throughput_bps=0
        )
        
    def start_monitoring(self, interval: float = 1.0):
        """Start buffer monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop,
            args=(interval,)
        )
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def _monitor_loop(self, interval: float):
        """Monitor buffer statistics"""
        last_bytes = {name: 0 for name in self.buffers}
        
        while self.monitoring:
            for name, buffer in self.buffers.items():
                stats = self.stats[name]
                
                # Update current usage
                current_usage = buffer.available_data()
                stats.used = current_usage
                
                # Track peak usage
                if current_usage > stats.peak_usage:
                    stats.peak_usage = current_usage
                    
                # Calculate throughput (simplified)
                current_total = getattr(buffer, 'total_bytes_processed', 0)
                bytes_delta = current_total - last_bytes[name]
                stats.throughput_bps = int(bytes_delta / interval)
                last_bytes[name] = current_total
                
                # Check for overflow/underrun
                if buffer.is_full():
                    stats.overflow_count += 1
                    
            time.sleep(interval)
            
    def get_report(self) -> str:
        """Generate buffer status report"""
        report = ["=== Buffer Status Report ==="]
        
        for name, stats in self.stats.items():
            utilization = (stats.used / stats.size) * 100
            report.append(f"\n{name}:")
            report.append(f"  Size: {stats.size:,} bytes")
            report.append(f"  Used: {stats.used:,} bytes ({utilization:.1f}%)")
            report.append(f"  Peak: {stats.peak_usage:,} bytes")
            report.append(f"  Throughput: {stats.throughput_bps:,} bytes/sec")
            report.append(f"  Overflows: {stats.overflow_count}")
            
        return "\n".join(report)
        
    def stop_monitoring(self):
        """Stop monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()

Memory Usage Analysis

class MemoryAnalyzer:
    def __init__(self):
        self.baseline_memory = None
        self.peak_memory = 0
        self.allocations = []
        
    def start_analysis(self):
        """Start memory usage analysis"""
        process = psutil.Process()
        self.baseline_memory = process.memory_info().rss
        self.peak_memory = self.baseline_memory
        
    def checkpoint(self, description: str):
        """Create memory checkpoint"""
        process = psutil.Process()
        current_memory = process.memory_info().rss
        
        if current_memory > self.peak_memory:
            self.peak_memory = current_memory
            
        self.allocations.append({
            'description': description,
            'memory_mb': current_memory / 1024 / 1024,
            'delta_mb': (current_memory - self.baseline_memory) / 1024 / 1024,
            'timestamp': time.time()
        })
        
    def get_report(self) -> str:
        """Generate memory usage report"""
        if not self.allocations:
            return "No memory checkpoints recorded"
            
        report = ["=== Memory Usage Analysis ==="]
        report.append(f"Baseline: {self.baseline_memory / 1024 / 1024:.1f} MB")
        report.append(f"Peak: {self.peak_memory / 1024 / 1024:.1f} MB")
        report.append(f"Growth: {(self.peak_memory - self.baseline_memory) / 1024 / 1024:.1f} MB")
        report.append("\nCheckpoints:")
        
        for checkpoint in self.allocations:
            report.append(
                f"  {checkpoint['description']}: "
                f"{checkpoint['memory_mb']:.1f} MB "
                f"(+{checkpoint['delta_mb']:.1f} MB)"
            )
            
        return "\n".join(report)

# Usage example
def analyze_buffer_performance():
    analyzer = MemoryAnalyzer()
    monitor = BufferMonitor()
    
    analyzer.start_analysis()
    analyzer.checkpoint("Initial state")
    
    # Create buffers
    rx_buffer = CircularBuffer(64 * 1024)
    tx_buffer = CircularBuffer(32 * 1024)
    
    analyzer.checkpoint("Buffers created")
    
    # Register for monitoring
    monitor.register_buffer("RX", rx_buffer)
    monitor.register_buffer("TX", tx_buffer)
    monitor.start_monitoring()
    
    analyzer.checkpoint("Monitoring started")
    
    # Simulate data processing
    test_data = b"X" * 1000
    for i in range(100):
        rx_buffer.write(test_data)
        time.sleep(0.01)
        
    analyzer.checkpoint("Data processing complete")
    
    # Generate reports
    print(monitor.get_report())
    print("\n" + analyzer.get_report())
    
    monitor.stop_monitoring()

Auto-tuning Buffer Sizes

class AdaptiveBuffer:
    """Self-tuning circular buffer"""
    
    def __init__(self, initial_size: int = 4096, min_size: int = 1024, max_size: int = 1024*1024):
        self.min_size = min_size
        self.max_size = max_size
        self.buffer = CircularBuffer(initial_size)
        self.stats = {
            'overflow_count': 0,
            'underrun_count': 0,
            'resize_count': 0,
            'avg_utilization': 0.0
        }
        self.utilization_history = deque(maxlen=100)
        
    def write(self, data: bytes) -> int:
        """Write with auto-resize capability"""
        written = self.buffer.write(data)
        
        if written < len(data):
            # Buffer overflow occurred
            self.stats['overflow_count'] += 1
            self._consider_resize_up()
            
        self._update_utilization()
        return written
        
    def read(self, max_bytes: int = None) -> bytes:
        """Read with underrun detection"""
        data = self.buffer.read(max_bytes)
        
        if len(data) == 0:
            self.stats['underrun_count'] += 1
            
        self._update_utilization()
        return data
        
    def _update_utilization(self):
        """Update utilization statistics"""
        utilization = self.buffer.available_data() / self.buffer.size
        self.utilization_history.append(utilization)
        
        if self.utilization_history:
            self.stats['avg_utilization'] = sum(self.utilization_history) / len(self.utilization_history)
            
    def _consider_resize_up(self):
        """Consider increasing buffer size"""
        current_size = self.buffer.size
        new_size = min(current_size * 2, self.max_size)
        
        if new_size > current_size:
            self._resize_buffer(new_size)
            
    def _consider_resize_down(self):
        """Consider decreasing buffer size"""
        if len(self.utilization_history) < 50:
            return
            
        # If consistently low utilization, shrink buffer
        if self.stats['avg_utilization'] < 0.25:
            current_size = self.buffer.size
            new_size = max(current_size // 2, self.min_size)
            
            if new_size < current_size:
                self._resize_buffer(new_size)
                
    def _resize_buffer(self, new_size: int):
        """Resize the buffer preserving data"""
        # Read all current data
        old_data = self.buffer.read()
        
        # Create new buffer
        self.buffer = CircularBuffer(new_size)
        
        # Restore data
        self.buffer.write(old_data)
        
        self.stats['resize_count'] += 1
        print(f"Buffer resized to {new_size} bytes")
        
    def periodic_tune(self):
        """Periodic tuning - call regularly"""
        self._consider_resize_down()

Proper buffer management is crucial for high-performance serial applications. Use these techniques to minimize memory allocation overhead and prevent data loss in demanding scenarios.

How is this guide?