Your First Agent¶
Terminology: In SW4RM, an “Agent” is a supervised, process‑isolated participant (see “Agents and Agentic Interaction” in documentation/index.md), not just an LLM wrapper or library coroutine.
Build a complete agent that handles messages, manages state, and demonstrates core SDK features.
Overview¶
We'll create an agent that:
- Connects to SW4RM services (Router and Registry)
- Registers itself with service discovery
- Processes incoming DATA messages with automatic ACK handling
- Maintains persistent message history
- Handles graceful shutdown
Basic Agent Structure¶
Create my_first_agent.py:
#!/usr/bin/env python3
"""My first SW4RM - demonstrates core SDK features."""
import grpc
import json
import signal
import sys
from pathlib import Path
from sw4rm.clients.registry import RegistryClient
from sw4rm.clients.router import RouterClient
from sw4rm.activity_buffer import PersistentActivityBuffer
from sw4rm.ack_integration import ACKLifecycleManager, MessageProcessor
from sw4rm import constants as C
class MyFirstAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.stop_requested = False
# Initialize persistent activity buffer
self.buffer = PersistentActivityBuffer(max_items=500)
def connect(self, router_addr: str, registry_addr: str):
"""Connect to SW4RM services."""
print(f"🔌 Connecting to router: {router_addr}, registry: {registry_addr}")
# Create gRPC connections
router_ch = grpc.insecure_channel(router_addr)
registry_ch = grpc.insecure_channel(registry_addr)
# Initialize clients
self.registry = RegistryClient(registry_ch)
self.router = RouterClient(router_ch)
# Set up ACK lifecycle management
self.ack_manager = ACKLifecycleManager(
router_client=self.router,
activity_buffer=self.buffer,
agent_id=self.agent_id
)
# Set up message processor with handlers
self.processor = MessageProcessor(self.ack_manager)
self.processor.register_handler(C.DATA, self.handle_data)
self.processor.set_default_handler(self.handle_unknown)
Message Handlers¶
Add message processing logic:
def handle_data(self, envelope):
"""Handle DATA messages."""
message_id = envelope.get("message_id", "")
payload = envelope.get("payload", b"")
print(f"📨 Processing DATA message {message_id}")
print(f" Payload size: {len(payload)} bytes")
# Process the message (echo back with metadata)
response = {
"original_id": message_id,
"agent_id": self.agent_id,
"status": "processed",
"payload_size": len(payload),
"timestamp": int(time.time())
}
# Send response using ACK manager
from sw4rm.envelope import build_envelope
response_env = build_envelope(
producer_id=self.agent_id,
message_type=C.DATA,
content_type="application/json",
payload=json.dumps(response).encode()
)
# Note: SDK helpers populate required envelope fields such as
# `correlation_id`, `sequence_number`, and `content_length`. An
# `idempotency_token` may be attached for exactly-once effects.
result = self.ack_manager.send_message_with_ack(response_env)
print(f"✅ Response sent: {result.success}")
return "data_processed"
def handle_unknown(self, envelope):
"""Handle unknown message types."""
msg_type = envelope.get("message_type", "unknown")
print(f"❓ Unknown message type: {msg_type}")
return "unknown_handled"
Registration and Main Loop¶
Add service registration and message processing:
def register(self):
"""Register with the registry service."""
descriptor = {
"agent_id": self.agent_id,
"name": "MyFirstAgent",
"description": "Learning the SW4RM SDK",
"capabilities": ["echo", "processing"],
"communication_class": C.STANDARD,
"modalities_supported": ["application/json"],
}
try:
response = self.registry.register(descriptor)
accepted = getattr(response, 'accepted', False)
reason = getattr(response, 'reason', '')
if accepted:
print(f"✅ Registered successfully: {reason}")
else:
print(f"❌ Registration failed: {reason}")
return accepted
except Exception as e:
print(f"❌ Registration error: {e}")
return False
def run(self):
"""Main message processing loop."""
print(f"🚀 Starting message loop for {self.agent_id}")
try:
for item in self.router.stream_incoming(self.agent_id):
if self.stop_requested:
break
# Convert protobuf message to dict
envelope_msg = getattr(item, "msg", item)
envelope = {
"message_id": getattr(envelope_msg, "message_id", ""),
"message_type": getattr(envelope_msg, "message_type", 0),
"content_type": getattr(envelope_msg, "content_type", ""),
"payload": getattr(envelope_msg, "payload", b""),
"producer_id": getattr(envelope_msg, "producer_id", ""),
}
# Process with automatic ACK handling
result = self.processor.process_message(envelope)
print(f"📋 Processed: {result.success}")
except KeyboardInterrupt:
print("🛑 Stopped by user")
except Exception as e:
print(f"❌ Error in message loop: {e}")
Graceful Shutdown¶
Add cleanup and shutdown logic:
def shutdown(self):
"""Clean shutdown."""
print("🔄 Shutting down...")
self.stop_requested = True
# Save state
self.buffer.flush()
# Deregister
try:
self.registry.deregister(self.agent_id, reason="shutdown")
print("✅ Deregistered successfully")
except Exception as e:
print(f"⚠️ Deregister failed: {e}")
# Show final stats
total = len(self.buffer._by_id)
unacked = len(self.buffer.unacked())
print(f"📊 Final stats: {total} total messages, {unacked} unacked")
def main():
agent = MyFirstAgent("quickstart-agent")
# Handle shutdown signals
def signal_handler(signum, frame):
print(f"\n📡 Received signal {signum}")
agent.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Connect and run
agent.connect("localhost:50051", "localhost:50052")
if agent.register():
agent.run()
else:
print("❌ Failed to register, exiting")
return 1
agent.shutdown()
return 0
if __name__ == "__main__":
import sys
sys.exit(main())
Test Your Agent¶
Create a test client (test_my_agent.py):
#!/usr/bin/env python3
"""Test script for my first agent."""
import grpc
import json
import time
from sw4rm.clients.router import RouterClient
from sw4rm.envelope import build_envelope
from sw4rm import constants as C
def test_agent():
# Connect to router
channel = grpc.insecure_channel("localhost:50051")
router = RouterClient(channel)
# Send test message
test_data = {
"message": "Hello from test!",
"timestamp": int(time.time())
}
envelope = build_envelope(
producer_id="test-client",
message_type=C.DATA,
content_type="application/json",
payload=json.dumps(test_data).encode()
)
# Tip: `build_envelope(...)` will generate a `correlation_id` and
# compute `content_length`; you can inspect them on the returned object.
# Send message
response = router.send_message(envelope)
accepted = getattr(response, 'accepted', False)
reason = getattr(response, 'reason', '')
print(f"✅ Message sent: accepted={accepted}, reason={reason}")
if __name__ == "__main__":
test_agent()
Run Your Agent¶
Two Terminal Setup
You'll need two terminals - one for the agent and one for testing.
Terminal 1 - Start your agent:
Expected output:
🔌 Connecting to router: localhost:50051, registry: localhost:50052
✅ Registered successfully:
🚀 Starting message loop for quickstart-agent
Terminal 2 - Test your agent:
Back in Terminal 1, you should see:
Understanding the Flow¶
Here's what happens when you run your agent:
sequenceDiagram
participant Agent
participant Registry
participant Router
participant TestClient
Agent->>Registry: Register("quickstart-agent")
Registry-->>Agent: Registration confirmed
Agent->>Router: StreamIncoming("quickstart-agent")
Router-->>Agent: Stream established
TestClient->>Router: SendMessage(DATA)
Router-->>TestClient: Message accepted
Router->>Agent: Deliver message
Agent->>Agent: Process with handler
Agent->>Router: Send response
Agent->>Router: Send ACKs (RECEIVED, READ, FULFILLED)
Key Concepts¶
Automatic ACK Lifecycle¶
The MessageProcessor automatically sends acknowledgments:
- RECEIVED - Immediately when message arrives
- READ - Before processing begins
- FULFILLED - On successful processing
- REJECTED/FAILED - On processing errors
Activity Buffer¶
The PersistentActivityBuffer tracks:
- All incoming and outgoing messages
- ACK progression for each message
- Messages that need reconciliation
- State persisted to
sw4rm_activity.json
Message Handlers¶
Handlers are functions that:
- Receive envelope dictionaries
- Return status strings
- Are automatically wrapped with ACK logic
- Can send responses through the ACK manager
Troubleshooting¶
"Connection refused"¶
The most common issue is services not running:
"Protobuf stubs not generated"¶
"Permission denied on files"¶
What's Next?¶
Your agent now handles basic message processing with persistent state!
In the next section, we'll add more advanced features:
- Worktree management
- Custom control commands
- Cross-restart state recovery