2.6 Persistence and State Management¶
This guide covers persistence options, recovery patterns, and state management strategies for building resilient SW4RM agents.
2.6.1 Overview¶
SW4RM agents maintain two primary types of persistent state:
| State Type | Purpose | Default Backend |
|---|---|---|
| Activity Buffer | Message processing history and deduplication | JSON file |
| Worktree State | Git repository bindings and workspace context | JSON file |
2.6.2 Persistence Backends¶
The SDK provides a PersistenceBackend protocol that you can implement for custom backends. Two backends are included:
| Backend | Use Case | Status |
|---|---|---|
JSONFilePersistence | Development, single-agent | ✅ Implemented |
SQLitePersistence | Production, single-node | ✅ Implemented |
| Redis | Distributed deployments | 🔮 Planned |
| PostgreSQL | Enterprise deployments | 🔮 Planned |
JSON File Backend (Development)¶
The default backend stores state as JSON files on disk. Use this backend for development and single-agent deployments.
from sw4rm.activity_buffer import PersistentActivityBuffer
from sw4rm.persistence import JSONFilePersistence
# Configure file-based persistence
persistence = JSONFilePersistence(
file_path="/var/lib/sw4rm/state/activity_buffer.json"
)
buffer = PersistentActivityBuffer(
persistence=persistence,
dedup_window_s=3600 # 1 hour deduplication window
)
Features:
- Atomic writes via temp file + rename
- Base64 encoding for binary payloads
- Automatic corruption recovery (returns empty state)
SQLite Backend (Production Single-Node)¶
Use SQLite for durable persistence with ACID guarantees on single-node deployments.
from sw4rm.activity_buffer import PersistentActivityBuffer
from sw4rm.persistence import SQLitePersistence
# Configure SQLite persistence
persistence = SQLitePersistence(
db_path="/var/lib/sw4rm/state/activity.sqlite3"
)
buffer = PersistentActivityBuffer(
persistence=persistence,
dedup_window_s=3600
)
Features:
- WAL mode for concurrent reads
- ACID-compliant transactions
- Automatic schema creation
- Indexed message lookups
Schema:
CREATE TABLE activity_records (
seq INTEGER PRIMARY KEY AUTOINCREMENT,
message_id TEXT UNIQUE NOT NULL,
ts_ms INTEGER NOT NULL,
direction TEXT NOT NULL,
envelope_json TEXT NOT NULL,
ack_stage INTEGER NOT NULL,
error_code INTEGER NOT NULL,
ack_note TEXT NOT NULL
);
Custom Backends¶
Implement the PersistenceBackend protocol for custom storage:
from sw4rm.persistence import PersistenceBackend
from typing import Dict, List, Any
class MyCustomPersistence:
"""Custom persistence backend."""
def save_records(self, records: Dict[str, Dict[str, Any]], order: List[str]) -> None:
"""Save activity records and their ordering."""
# Your implementation here
pass
def load_records(self) -> tuple[Dict[str, Dict[str, Any]], List[str]]:
"""Load activity records and their ordering."""
# Your implementation here
return {}, []
def clear(self) -> None:
"""Clear all stored data."""
# Your implementation here
pass
2.6.3 Activity Buffer Configuration¶
Buffer Parameters¶
The PersistentActivityBuffer class provides message tracking with deduplication:
from sw4rm.activity_buffer import PersistentActivityBuffer
buffer = PersistentActivityBuffer(
persistence=persistence,
dedup_window_s=3600, # Deduplication window in seconds (default: 3600)
max_items=1000 # Maximum records to retain (default: 1000)
)
Core Methods¶
# Record an incoming message
buffer.record_incoming(envelope_dict)
# Record an outgoing message
buffer.record_outgoing(envelope_dict)
# Update with ACK information (pass ACK as dict)
buffer.ack({
"ack_for_message_id": message_id,
"ack_stage": 3, # FULFILLED
"error_code": 0,
"note": ""
})
# Get record by message_id
record = buffer.get(message_id)
# Check for duplicates by idempotency token
existing = buffer.get_by_idempotency_token(token)
# Find all unacked messages (RECEIVED or READ stage)
unacked = buffer.unacked()
# Get recent messages
recent = buffer.recent(n=100)
# Force save to persistence
buffer.flush()
2.6.4 Message Deduplication¶
Idempotency Token Processing¶
The activity buffer deduplicates messages using the Three-ID model:
- message_id: Unique per transmission attempt (UUIDv4)
- idempotency_token: Stable across retries for deduplication
- correlation_id: Links related messages (request/response)
# Check if message is a duplicate before processing
existing = buffer.get_by_idempotency_token(envelope["idempotency_token"])
if existing:
# Duplicate detected - skip processing
return existing
# Record and process new message
buffer.record_incoming(envelope)
# ... process message ...
buffer.ack({
"ack_for_message_id": envelope["message_id"],
"ack_stage": 3 # FULFILLED
})
Deduplication Window¶
The deduplication window determines how long idempotency tokens are tracked:
buffer = PersistentActivityBuffer(
persistence=persistence,
dedup_window_s=3600, # 1 hour (default)
)
Messages older than dedup_window_s may be pruned, allowing the same idempotency token to be reused after the window expires.
2.6.5 Crash Recovery¶
Recovery Flow¶
When an agent restarts after a crash, the activity buffer automatically loads persisted state:
sequenceDiagram
participant A as Agent (Restarting)
participant P as Persistence
participant R as Router
A->>P: Load persisted state
P-->>A: Activity buffer entries
A->>A: Identify incomplete messages
Note over A: Status = RECEIVED or READ
A->>R: Re-request incomplete messages
R-->>A: Message replay
A->>A: Resume processing
A->>P: Update state Reconciliation¶
Use the reconcile() method to find messages that need retry after a restart:
# Load existing state on startup
buffer = PersistentActivityBuffer(persistence=persistence)
# Find messages that weren't fully processed
incomplete = buffer.reconcile()
for record in incomplete:
if record.ack_stage < 3: # Not yet FULFILLED
# Re-process or request replay from Router
handle_incomplete_message(record)
Unacked Message Detection¶
The unacked() method finds messages that haven't been fully acknowledged (still in RECEIVED or READ stage):
# Find all messages that need acknowledgment
stale_messages = buffer.unacked()
for record in stale_messages:
# Trigger retry logic
retry_message(record)
2.6.6 Worktree State Persistence¶
Persisting Git Context¶
Use PersistentWorktreeState to track repository bindings across restarts:
from sw4rm.worktree_state import PersistentWorktreeState, WorktreePersistence
# Configure worktree persistence
persistence = WorktreePersistence(file_path="/var/lib/sw4rm/worktree.json")
worktree = PersistentWorktreeState(
persistence=persistence,
policy=None # Use default policy, or provide custom WorktreePolicyHook
)
# Bind to a repository
worktree.bind(
repo_id="org/repo",
worktree_id="main",
metadata={"path": "/workspace/repo"}
)
# State is automatically persisted
# On restart, bindings are automatically restored from persistence
Worktree Lifecycle¶
# Check current binding
if worktree.is_bound():
binding = worktree.current()
print(f"{binding.repo_id}/{binding.worktree_id}")
# Get detailed status
status = worktree.status()
print(f"Bound: {status['bound']}")
# Switch to different worktree (unbinds then rebinds)
worktree.switch(repo_id="org/repo", worktree_id="feature-branch")
# Unbind
worktree.unbind()
2.6.7 Planned Features¶
The following features are planned for future releases:
Checkpointing (🔮 Planned)¶
Periodic state snapshots for faster recovery:
# Planned API - not yet implemented
buffer = PersistentActivityBuffer(
persistence=persistence,
checkpoint_interval=300, # Checkpoint every 5 minutes
max_checkpoints=10, # Keep last 10 checkpoints
)
Multi-Instance Coordination (🔮 Planned)¶
Distributed locking for multi-agent deployments:
# Planned API - not yet implemented
from sw4rm.persistence import DistributedLock
async with DistributedLock(
persistence=redis_persistence,
resource="agent:task-123",
timeout_seconds=30
):
await process_task()
Retention Policies (🔮 Planned)¶
Automatic data expiration and cleanup:
# Planned API - not yet implemented
buffer = PersistentActivityBuffer(
persistence=persistence,
retention_hours=168, # Delete entries older than 7 days
cleanup_interval=3600, # Run cleanup every hour
)
Health Checks and Metrics (🔮 Planned)¶
Monitoring and diagnostics for persistence backends:
# Planned API - not yet implemented
health = await persistence.health_check()
stats = buffer.get_statistics()
Current Workaround
For now, you can implement manual cleanup by periodically calling buffer.clear() or by implementing custom logic using buffer.recent() to identify old entries.
2.6.8 Best Practices¶
Development¶
- Use the JSON file backend for local development.
- Set a reasonable
max_itemslimit (default: 1000). - Use context manager (
with buffer:) to ensure data is saved on exit.
Production (Single-Node)¶
- Use SQLite backend for durable persistence with ACID guarantees.
- Configure
max_itemsbased on expected message volume. - Call
buffer.flush()at critical checkpoints. - Implement application-level cleanup using
reconcile()after restarts.
Security¶
- Encrypt sensitive payload data before persistence.
- Store persistence files in secure locations with appropriate permissions.
- Consider encrypting the SQLite database file at rest.
See Also¶
- First Agent Tutorial - Basic agent implementation
- Installation Guide - Environment setup
- ACK Lifecycle - Message acknowledgment patterns