Buffer Management & Memory Optimization
Advanced buffer management techniques for PySerial - circular buffers, memory pools, and zero-copy operations for high-performance applications
Optimize memory usage and prevent buffer overflows with advanced buffer management techniques for high-throughput serial applications.
Circular Buffer Implementation
Circular buffers provide constant-time operations and prevent memory fragmentation for continuous data streams.
import threading
import time
from typing import Optional
class CircularBuffer:
def __init__(self, size: int):
self.buffer = bytearray(size)
self.size = size
self.head = 0 # Write position
self.tail = 0 # Read position
self.count = 0 # Number of bytes in buffer
self.lock = threading.RLock()
def write(self, data: bytes) -> int:
"""Write data to buffer, returns bytes written"""
with self.lock:
if isinstance(data, str):
data = data.encode()
bytes_written = 0
for byte in data:
if self.count < self.size:
self.buffer[self.head] = byte
self.head = (self.head + 1) % self.size
self.count += 1
bytes_written += 1
else:
break # Buffer full
return bytes_written
def read(self, max_bytes: int = None) -> bytes:
"""Read data from buffer"""
with self.lock:
if max_bytes is None:
max_bytes = self.count
else:
max_bytes = min(max_bytes, self.count)
if max_bytes == 0:
return b''
result = bytearray()
for _ in range(max_bytes):
result.append(self.buffer[self.tail])
self.tail = (self.tail + 1) % self.size
self.count -= 1
return bytes(result)
def peek(self, max_bytes: int = None) -> bytes:
"""Read data without removing from buffer"""
with self.lock:
if max_bytes is None:
max_bytes = self.count
else:
max_bytes = min(max_bytes, self.count)
if max_bytes == 0:
return b''
result = bytearray()
temp_tail = self.tail
for _ in range(max_bytes):
result.append(self.buffer[temp_tail])
temp_tail = (temp_tail + 1) % self.size
return bytes(result)
def readline(self, delimiter: bytes = b'\n') -> Optional[bytes]:
"""Read until delimiter found"""
with self.lock:
# Find delimiter
temp_tail = self.tail
line_length = 0
found_delimiter = False
for _ in range(self.count):
if self.buffer[temp_tail:temp_tail+len(delimiter)] == delimiter:
found_delimiter = True
line_length += len(delimiter)
break
temp_tail = (temp_tail + 1) % self.size
line_length += 1
if not found_delimiter:
return None # Delimiter not found
# Read the line including delimiter
return self.read(line_length)
def available_space(self) -> int:
"""Get available write space"""
with self.lock:
return self.size - self.count
def available_data(self) -> int:
"""Get available read data"""
with self.lock:
return self.count
def is_full(self) -> bool:
with self.lock:
return self.count == self.size
def is_empty(self) -> bool:
with self.lock:
return self.count == 0
def clear(self):
"""Clear all data"""
with self.lock:
self.head = 0
self.tail = 0
self.count = 0
# Buffered Serial Manager
import serial
class BufferedSerial:
def __init__(self, port, baudrate=9600, buffer_size=64*1024):
self.serial = serial.Serial(port, baudrate)
self.rx_buffer = CircularBuffer(buffer_size)
self.tx_buffer = CircularBuffer(buffer_size)
self.running = False
def start(self):
"""Start background I/O threads"""
self.running = True
self.rx_thread = threading.Thread(target=self._rx_worker)
self.tx_thread = threading.Thread(target=self._tx_worker)
self.rx_thread.daemon = True
self.tx_thread.daemon = True
self.rx_thread.start()
self.tx_thread.start()
def _rx_worker(self):
"""Background receive worker"""
while self.running:
try:
if self.serial.in_waiting and not self.rx_buffer.is_full():
# Read available data
max_read = min(
self.serial.in_waiting,
self.rx_buffer.available_space()
)
data = self.serial.read(max_read)
self.rx_buffer.write(data)
time.sleep(0.001)
except serial.SerialException:
break
def _tx_worker(self):
"""Background transmit worker"""
while self.running:
try:
if not self.tx_buffer.is_empty():
# Send available data
data = self.tx_buffer.read(1024) # Send in chunks
self.serial.write(data)
time.sleep(0.001)
except serial.SerialException:
break
def write(self, data: bytes) -> int:
"""Write data to transmit buffer"""
return self.tx_buffer.write(data)
def read(self, max_bytes: int = None) -> bytes:
"""Read data from receive buffer"""
return self.rx_buffer.read(max_bytes)
def readline(self, delimiter: bytes = b'\n') -> Optional[bytes]:
"""Read line from receive buffer"""
return self.rx_buffer.readline(delimiter)
def in_waiting(self) -> int:
"""Get bytes available for reading"""
return self.rx_buffer.available_data()
def out_waiting(self) -> int:
"""Get bytes waiting to be transmitted"""
return self.tx_buffer.available_data()
def stop(self):
"""Stop background threads and close serial"""
self.running = False
if hasattr(self, 'rx_thread'):
self.rx_thread.join()
if hasattr(self, 'tx_thread'):
self.tx_thread.join()
self.serial.close()
Memory Pool Management
Pre-allocate memory pools to avoid garbage collection overhead in high-frequency operations.
import threading
from typing import List, Optional
from collections import deque
class MemoryPool:
def __init__(self, block_size: int, pool_size: int = 100):
self.block_size = block_size
self.available = deque()
self.in_use = set()
self.lock = threading.Lock()
# Pre-allocate memory blocks
for _ in range(pool_size):
block = bytearray(block_size)
self.available.append(block)
def get_block(self) -> Optional[bytearray]:
"""Get a memory block from pool"""
with self.lock:
if self.available:
block = self.available.popleft()
self.in_use.add(id(block))
# Clear the block
block[:] = b'\x00' * len(block)
return block
return None # Pool exhausted
def return_block(self, block: bytearray):
"""Return block to pool"""
with self.lock:
block_id = id(block)
if block_id in self.in_use:
self.in_use.remove(block_id)
self.available.append(block)
def stats(self) -> dict:
"""Get pool statistics"""
with self.lock:
return {
'total_blocks': len(self.available) + len(self.in_use),
'available': len(self.available),
'in_use': len(self.in_use),
'block_size': self.block_size
}
class PooledSerialBuffer:
def __init__(self, port, baudrate=9600, block_size=4096):
self.serial = serial.Serial(port, baudrate)
self.pool = MemoryPool(block_size, 50)
self.rx_blocks = deque()
self.current_rx_block = None
self.current_rx_pos = 0
self.lock = threading.Lock()
self.running = False
def start(self):
self.running = True
self.rx_thread = threading.Thread(target=self._rx_worker)
self.rx_thread.daemon = True
self.rx_thread.start()
def _rx_worker(self):
while self.running:
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
self._buffer_data(data)
time.sleep(0.001)
except serial.SerialException:
break
def _buffer_data(self, data: bytes):
"""Buffer incoming data using memory pool"""
with self.lock:
data_pos = 0
while data_pos < len(data):
# Get current block or create new one
if not self.current_rx_block or self.current_rx_pos >= len(self.current_rx_block):
self.current_rx_block = self.pool.get_block()
if not self.current_rx_block:
# Pool exhausted, allocate new block
self.current_rx_block = bytearray(self.pool.block_size)
self.current_rx_pos = 0
# Copy data to current block
remaining_in_block = len(self.current_rx_block) - self.current_rx_pos
copy_size = min(remaining_in_block, len(data) - data_pos)
end_pos = self.current_rx_pos + copy_size
self.current_rx_block[self.current_rx_pos:end_pos] = data[data_pos:data_pos + copy_size]
self.current_rx_pos += copy_size
data_pos += copy_size
# If block is full, add to queue
if self.current_rx_pos >= len(self.current_rx_block):
self.rx_blocks.append((self.current_rx_block, self.current_rx_pos))
self.current_rx_block = None
def read_data(self, max_bytes: int = None) -> bytes:
"""Read buffered data"""
with self.lock:
result = bytearray()
bytes_read = 0
while self.rx_blocks and (max_bytes is None or bytes_read < max_bytes):
block, used_size = self.rx_blocks[0]
# Calculate how much to read from this block
available = used_size
if max_bytes is not None:
available = min(available, max_bytes - bytes_read)
# Copy data
result.extend(block[:available])
bytes_read += available
# Update or remove block
if available >= used_size:
# Consumed entire block
self.rx_blocks.popleft()
self.pool.return_block(block)
else:
# Partial consumption - shift data
block[:used_size - available] = block[available:used_size]
self.rx_blocks[0] = (block, used_size - available)
return bytes(result)
def stop(self):
self.running = False
if hasattr(self, 'rx_thread'):
self.rx_thread.join()
# Return all blocks to pool
with self.lock:
while self.rx_blocks:
block, _ = self.rx_blocks.popleft()
self.pool.return_block(block)
if self.current_rx_block:
self.pool.return_block(self.current_rx_block)
self.serial.close()
class BufferPool:
"""Pool of pre-allocated buffers for different sizes"""
def __init__(self):
self.pools = {
1024: MemoryPool(1024, 20), # Small messages
4096: MemoryPool(4096, 10), # Medium messages
16384: MemoryPool(16384, 5), # Large messages
65536: MemoryPool(65536, 2) # Very large messages
}
self.lock = threading.Lock()
def get_buffer(self, min_size: int) -> Optional[bytearray]:
"""Get smallest suitable buffer"""
with self.lock:
for size in sorted(self.pools.keys()):
if size >= min_size:
return self.pools[size].get_block()
return None # No suitable buffer
def return_buffer(self, buffer: bytearray):
"""Return buffer to appropriate pool"""
with self.lock:
buffer_size = len(buffer)
if buffer_size in self.pools:
self.pools[buffer_size].return_block(buffer)
def get_stats(self) -> dict:
"""Get statistics for all pools"""
with self.lock:
stats = {}
for size, pool in self.pools.items():
stats[f'pool_{size}'] = pool.stats()
return stats
# Usage with automatic buffer sizing
class AdaptiveBufferedSerial:
def __init__(self, port, baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.buffer_pool = BufferPool()
self.message_sizes = deque(maxlen=100) # Track message sizes
self.running = False
def _predict_buffer_size(self) -> int:
"""Predict needed buffer size based on history"""
if not self.message_sizes:
return 1024
avg_size = sum(self.message_sizes) / len(self.message_sizes)
max_size = max(self.message_sizes)
# Use 150% of average or max, whichever is larger
predicted = max(int(avg_size * 1.5), max_size)
# Round up to next pool size
for size in sorted(self.buffer_pool.pools.keys()):
if size >= predicted:
return size
return 65536 # Largest available
def read_message(self, delimiter: bytes = b'\n') -> Optional[bytes]:
"""Read complete message with optimal buffer allocation"""
buffer_size = self._predict_buffer_size()
buffer = self.buffer_pool.get_buffer(buffer_size)
if not buffer:
# Fallback to regular allocation
buffer = bytearray(buffer_size)
try:
# Read message into buffer
pos = 0
while pos < len(buffer):
if self.serial.in_waiting:
byte = self.serial.read(1)
buffer[pos] = byte[0]
pos += 1
if buffer[pos-len(delimiter):pos] == delimiter:
# Found complete message
message = bytes(buffer[:pos])
self.message_sizes.append(pos)
return message
finally:
# Return buffer to pool
if hasattr(buffer, '__len__') and len(buffer) in self.buffer_pool.pools:
self.buffer_pool.return_buffer(buffer)
return None
import struct
from dataclasses import dataclass
from typing import Any, Dict
@dataclass
class Message:
msg_id: int
timestamp: float
data: bytes
metadata: Dict[str, Any] = None
def serialize(self) -> bytes:
"""Serialize message to bytes"""
metadata_bytes = str(self.metadata or {}).encode()
return struct.pack(
'>IQI', # Big-endian: uint32, uint64, uint32
self.msg_id,
int(self.timestamp * 1000000), # Microseconds
len(self.data)
) + self.data + struct.pack('>I', len(metadata_bytes)) + metadata_bytes
@classmethod
def deserialize(cls, data: bytes) -> 'Message':
"""Deserialize message from bytes"""
if len(data) < 16: # Minimum size
raise ValueError("Invalid message data")
# Unpack header
msg_id, timestamp_us, data_len = struct.unpack('>IQI', data[:16])
timestamp = timestamp_us / 1000000.0
# Extract message data
msg_data = data[16:16 + data_len]
# Extract metadata
metadata_len = struct.unpack('>I', data[16 + data_len:20 + data_len])[0]
metadata_bytes = data[20 + data_len:20 + data_len + metadata_len]
metadata = eval(metadata_bytes.decode()) if metadata_bytes else {}
return cls(msg_id, timestamp, msg_data, metadata)
class MessagePool:
def __init__(self, pool_size: int = 1000):
self.available = deque()
self.in_use = set()
self.lock = threading.Lock()
# Pre-allocate message objects
for i in range(pool_size):
msg = Message(0, 0.0, b'', {})
self.available.append(msg)
def get_message(self) -> Optional[Message]:
"""Get message from pool"""
with self.lock:
if self.available:
msg = self.available.popleft()
self.in_use.add(id(msg))
# Reset message
msg.msg_id = 0
msg.timestamp = 0.0
msg.data = b''
msg.metadata = {}
return msg
return None
def return_message(self, msg: Message):
"""Return message to pool"""
with self.lock:
msg_id = id(msg)
if msg_id in self.in_use:
self.in_use.remove(msg_id)
self.available.append(msg)
# High-performance message processor
class MessageProcessor:
def __init__(self, port, baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.msg_pool = MessagePool(200)
self.buffer_pool = BufferPool()
self.processed_messages = deque(maxlen=1000)
self.stats = {
'messages_processed': 0,
'bytes_processed': 0,
'pool_hits': 0,
'pool_misses': 0
}
def process_stream(self, callback=None):
"""Process incoming message stream"""
buffer = self.buffer_pool.get_buffer(4096)
if buffer:
self.stats['pool_hits'] += 1
else:
buffer = bytearray(4096)
self.stats['pool_misses'] += 1
try:
pos = 0
while True:
if self.serial.in_waiting:
# Read available data
available = min(self.serial.in_waiting, len(buffer) - pos)
data = self.serial.read(available)
buffer[pos:pos + available] = data
pos += available
# Process complete messages
while pos >= 16: # Minimum message size
try:
# Try to parse message
msg_data = bytes(buffer[:pos])
message = Message.deserialize(msg_data)
# Get message object from pool
pooled_msg = self.msg_pool.get_message()
if pooled_msg:
pooled_msg.msg_id = message.msg_id
pooled_msg.timestamp = message.timestamp
pooled_msg.data = message.data
pooled_msg.metadata = message.metadata
message = pooled_msg
# Process message
if callback:
callback(message)
self.processed_messages.append(message)
self.stats['messages_processed'] += 1
self.stats['bytes_processed'] += len(message.data)
# Remove processed data from buffer
message_size = len(msg_data)
buffer[:pos - message_size] = buffer[message_size:pos]
pos -= message_size
except (ValueError, struct.error):
# Not a complete message yet
break
# If buffer is getting full, expand it
if pos > len(buffer) * 0.8:
new_buffer = self.buffer_pool.get_buffer(len(buffer) * 2)
if new_buffer:
new_buffer[:pos] = buffer[:pos]
self.buffer_pool.return_buffer(buffer)
buffer = new_buffer
finally:
# Cleanup
if hasattr(buffer, '__len__') and len(buffer) in self.buffer_pool.pools:
self.buffer_pool.return_buffer(buffer)
Zero-Copy Operations
Zero-copy techniques can significantly improve performance but require careful memory management.
import mmap
import os
from typing import Iterator
class ZeroCopyFileBuffer:
"""Memory-mapped file buffer for zero-copy operations"""
def __init__(self, filename: str, size: int = 1024*1024):
self.filename = filename
self.size = size
self.file = None
self.mmap = None
self.write_pos = 0
self.read_pos = 0
def __enter__(self):
# Create file if it doesn't exist
if not os.path.exists(self.filename):
with open(self.filename, 'wb') as f:
f.write(b'\x00' * self.size)
self.file = open(self.filename, 'r+b')
self.mmap = mmap.mmap(self.file.fileno(), 0)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.mmap:
self.mmap.close()
if self.file:
self.file.close()
def write_data(self, data: bytes) -> int:
"""Write data to memory-mapped buffer"""
if self.write_pos + len(data) > self.size:
# Wrap around (circular buffer behavior)
available = self.size - self.write_pos
self.mmap[self.write_pos:] = data[:available]
remaining = data[available:]
if remaining:
self.mmap[:len(remaining)] = remaining
self.write_pos = len(remaining)
else:
self.write_pos = 0
else:
self.mmap[self.write_pos:self.write_pos + len(data)] = data
self.write_pos += len(data)
return len(data)
def get_view(self, size: int) -> memoryview:
"""Get zero-copy view of data"""
if self.read_pos + size <= self.size:
view = memoryview(self.mmap)[self.read_pos:self.read_pos + size]
self.read_pos += size
else:
# Handle wrap-around
part1_size = self.size - self.read_pos
part1 = memoryview(self.mmap)[self.read_pos:]
part2 = memoryview(self.mmap)[:size - part1_size]
# Note: This creates a copy for wrap-around case
combined = bytearray(part1) + bytearray(part2)
view = memoryview(combined)
self.read_pos = size - part1_size
return view
class ZeroCopySerial:
"""Serial interface with zero-copy buffer operations"""
def __init__(self, port, baudrate=9600, buffer_file='/tmp/serial_buffer.dat'):
self.serial = serial.Serial(port, baudrate)
self.buffer_file = buffer_file
self.running = False
def start_logging(self, callback=None):
"""Start zero-copy logging to memory-mapped file"""
self.running = True
def log_worker():
with ZeroCopyFileBuffer(self.buffer_file, 10*1024*1024) as buffer:
while self.running:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
buffer.write_data(data)
if callback:
# Provide zero-copy view to callback
view = memoryview(data)
callback(view)
time.sleep(0.001)
self.log_thread = threading.Thread(target=log_worker)
self.log_thread.daemon = True
self.log_thread.start()
def read_chunks(self, chunk_size: int = 4096) -> Iterator[memoryview]:
"""Read data in chunks using zero-copy views"""
with ZeroCopyFileBuffer(self.buffer_file) as buffer:
while True:
try:
chunk = buffer.get_view(chunk_size)
if len(chunk) == 0:
break
yield chunk
except:
break
def stop(self):
self.running = False
if hasattr(self, 'log_thread'):
self.log_thread.join()
self.serial.close()
Buffer Monitoring & Statistics
Monitor Buffer Health
import psutil
import threading
import time
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class BufferStats:
name: str
size: int
used: int
peak_usage: int
overflow_count: int
underrun_count: int
throughput_bps: int
class BufferMonitor:
def __init__(self):
self.buffers: Dict[str, CircularBuffer] = {}
self.stats: Dict[str, BufferStats] = {}
self.monitoring = False
self.monitor_thread = None
def register_buffer(self, name: str, buffer: CircularBuffer):
"""Register a buffer for monitoring"""
self.buffers[name] = buffer
self.stats[name] = BufferStats(
name=name,
size=buffer.size,
used=0,
peak_usage=0,
overflow_count=0,
underrun_count=0,
throughput_bps=0
)
def start_monitoring(self, interval: float = 1.0):
"""Start buffer monitoring"""
self.monitoring = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,)
)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _monitor_loop(self, interval: float):
"""Monitor buffer statistics"""
last_bytes = {name: 0 for name in self.buffers}
while self.monitoring:
for name, buffer in self.buffers.items():
stats = self.stats[name]
# Update current usage
current_usage = buffer.available_data()
stats.used = current_usage
# Track peak usage
if current_usage > stats.peak_usage:
stats.peak_usage = current_usage
# Calculate throughput (simplified)
current_total = getattr(buffer, 'total_bytes_processed', 0)
bytes_delta = current_total - last_bytes[name]
stats.throughput_bps = int(bytes_delta / interval)
last_bytes[name] = current_total
# Check for overflow/underrun
if buffer.is_full():
stats.overflow_count += 1
time.sleep(interval)
def get_report(self) -> str:
"""Generate buffer status report"""
report = ["=== Buffer Status Report ==="]
for name, stats in self.stats.items():
utilization = (stats.used / stats.size) * 100
report.append(f"\n{name}:")
report.append(f" Size: {stats.size:,} bytes")
report.append(f" Used: {stats.used:,} bytes ({utilization:.1f}%)")
report.append(f" Peak: {stats.peak_usage:,} bytes")
report.append(f" Throughput: {stats.throughput_bps:,} bytes/sec")
report.append(f" Overflows: {stats.overflow_count}")
return "\n".join(report)
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
Memory Usage Analysis
class MemoryAnalyzer:
def __init__(self):
self.baseline_memory = None
self.peak_memory = 0
self.allocations = []
def start_analysis(self):
"""Start memory usage analysis"""
process = psutil.Process()
self.baseline_memory = process.memory_info().rss
self.peak_memory = self.baseline_memory
def checkpoint(self, description: str):
"""Create memory checkpoint"""
process = psutil.Process()
current_memory = process.memory_info().rss
if current_memory > self.peak_memory:
self.peak_memory = current_memory
self.allocations.append({
'description': description,
'memory_mb': current_memory / 1024 / 1024,
'delta_mb': (current_memory - self.baseline_memory) / 1024 / 1024,
'timestamp': time.time()
})
def get_report(self) -> str:
"""Generate memory usage report"""
if not self.allocations:
return "No memory checkpoints recorded"
report = ["=== Memory Usage Analysis ==="]
report.append(f"Baseline: {self.baseline_memory / 1024 / 1024:.1f} MB")
report.append(f"Peak: {self.peak_memory / 1024 / 1024:.1f} MB")
report.append(f"Growth: {(self.peak_memory - self.baseline_memory) / 1024 / 1024:.1f} MB")
report.append("\nCheckpoints:")
for checkpoint in self.allocations:
report.append(
f" {checkpoint['description']}: "
f"{checkpoint['memory_mb']:.1f} MB "
f"(+{checkpoint['delta_mb']:.1f} MB)"
)
return "\n".join(report)
# Usage example
def analyze_buffer_performance():
analyzer = MemoryAnalyzer()
monitor = BufferMonitor()
analyzer.start_analysis()
analyzer.checkpoint("Initial state")
# Create buffers
rx_buffer = CircularBuffer(64 * 1024)
tx_buffer = CircularBuffer(32 * 1024)
analyzer.checkpoint("Buffers created")
# Register for monitoring
monitor.register_buffer("RX", rx_buffer)
monitor.register_buffer("TX", tx_buffer)
monitor.start_monitoring()
analyzer.checkpoint("Monitoring started")
# Simulate data processing
test_data = b"X" * 1000
for i in range(100):
rx_buffer.write(test_data)
time.sleep(0.01)
analyzer.checkpoint("Data processing complete")
# Generate reports
print(monitor.get_report())
print("\n" + analyzer.get_report())
monitor.stop_monitoring()
Auto-tuning Buffer Sizes
class AdaptiveBuffer:
"""Self-tuning circular buffer"""
def __init__(self, initial_size: int = 4096, min_size: int = 1024, max_size: int = 1024*1024):
self.min_size = min_size
self.max_size = max_size
self.buffer = CircularBuffer(initial_size)
self.stats = {
'overflow_count': 0,
'underrun_count': 0,
'resize_count': 0,
'avg_utilization': 0.0
}
self.utilization_history = deque(maxlen=100)
def write(self, data: bytes) -> int:
"""Write with auto-resize capability"""
written = self.buffer.write(data)
if written < len(data):
# Buffer overflow occurred
self.stats['overflow_count'] += 1
self._consider_resize_up()
self._update_utilization()
return written
def read(self, max_bytes: int = None) -> bytes:
"""Read with underrun detection"""
data = self.buffer.read(max_bytes)
if len(data) == 0:
self.stats['underrun_count'] += 1
self._update_utilization()
return data
def _update_utilization(self):
"""Update utilization statistics"""
utilization = self.buffer.available_data() / self.buffer.size
self.utilization_history.append(utilization)
if self.utilization_history:
self.stats['avg_utilization'] = sum(self.utilization_history) / len(self.utilization_history)
def _consider_resize_up(self):
"""Consider increasing buffer size"""
current_size = self.buffer.size
new_size = min(current_size * 2, self.max_size)
if new_size > current_size:
self._resize_buffer(new_size)
def _consider_resize_down(self):
"""Consider decreasing buffer size"""
if len(self.utilization_history) < 50:
return
# If consistently low utilization, shrink buffer
if self.stats['avg_utilization'] < 0.25:
current_size = self.buffer.size
new_size = max(current_size // 2, self.min_size)
if new_size < current_size:
self._resize_buffer(new_size)
def _resize_buffer(self, new_size: int):
"""Resize the buffer preserving data"""
# Read all current data
old_data = self.buffer.read()
# Create new buffer
self.buffer = CircularBuffer(new_size)
# Restore data
self.buffer.write(old_data)
self.stats['resize_count'] += 1
print(f"Buffer resized to {new_size} bytes")
def periodic_tune(self):
"""Periodic tuning - call regularly"""
self._consider_resize_down()
Circular Buffers
Implement efficient ring buffers for continuous data streams
Memory Pools
Pre-allocate memory to avoid GC overhead
Zero-Copy
Use memory views and mapping for optimal performance
Monitoring
Track buffer health and auto-tune parameters
Proper buffer management is crucial for high-performance serial applications. Use these techniques to minimize memory allocation overhead and prevent data loss in demanding scenarios.
How is this guide?