3.11 Advanced Patterns (v0.5.0)¶
This section documents the advanced multi-agent coordination patterns introduced in SW4RM Protocol v0.5.0. These patterns enable sophisticated workflows for artifact approval, agent delegation, and DAG-based orchestration.
3.11.0 Negotiation Foundations¶
The following subsections document the core negotiation mechanics that underpin the Negotiation Room, Handoff, and Workflow patterns.
3.11.0.1 Negotiation Event Fanout (§17.1)¶
Negotiation events are carried as NEGOTIATION envelopes whose payload is a JSON object. Implementations MUST preserve raw payload bytes and correlation_id. Unknown fields MUST be ignored by receivers. The following event kinds are defined:
| Kind | Payload Schema | Description |
|---|---|---|
open | { kind, ts, topic: string, corr: string } | Opens a new negotiation session |
policy | { kind, ts, negotiation_id: string, profile?: string, policy: NegotiationPolicy } | Broadcasts the effective policy |
propose | { kind, ts, from: string, ct: string, payload_b64: string } | Submits a proposal |
counter | { kind, ts, from: string, ct: string, payload_b64: string } | Submits a counter-proposal |
evaluate | { kind, ts, from: string, score: number, notes: string } | Evaluates a proposal |
decide | { kind, ts, by: string, ct: string, result_b64: string } | Issues a final decision |
abort | { kind, ts, reason: string } | Aborts the negotiation |
payload_b64 and result_b64 hold opaque bytes for proposals/results; ct is the content type. SDKs SHOULD provide convenience helpers to decode on demand. Services MUST NOT reorder events; ordering follows the service stream.
3.11.0.2 Negotiation Policy and Effective Policy (§17.2)¶
The Scheduler is the source of truth for negotiation policy. On Open, the Scheduler MUST derive an EffectivePolicy from a base NegotiationPolicy and any clamped AgentPreferences, then broadcast a policy event. Policy MAY be selected by a profile hint provided at Open.
The base NegotiationPolicy includes:
| Field | Type | Description |
|---|---|---|
max_rounds | u32 | Maximum negotiation rounds |
score_threshold | f32 (0..1) | Score required for acceptance |
diff_tolerance | f32 (0..1) | Maximum acceptable structural diff magnitude |
round_timeout_ms | u64 | Timeout per round in milliseconds |
token_budget_per_round | u64 | Token budget per round |
total_token_budget | u64 | Optional total token budget across all rounds |
oscillation_limit | u32 | Maximum allowed score oscillations before abort |
hitl | enum | HITL gate: None, PauseBetweenRounds, PauseOnFinalAccept |
scoring.require_schema_valid | bool | Require JSON Schema validation |
scoring.require_examples_pass | bool | Require executable examples to pass |
scoring.llm_weight | f32 | Weight for LLM confidence in blended scoring |
The EffectivePolicy is the scheduler-owned, per-negotiation policy after clamping agent preferences to scheduler guardrails. Implementations MUST persist the effective policy per room and include it in the broadcast.
3.11.0.3 Validation, Diff, and Scoring (§17.3)¶
Implementations SHOULD support early validation of proposals using JSON Schema and executable examples. Invalid drafts MUST be rejected without consuming a round.
Per round, implementations SHOULD compute and record a structural JSON DeltaSummary with:
magnitude: Bounded numerical measure of change size.changed_paths: Set of JSON paths that changed between rounds.
Scoring pipeline: Deterministic scoring MUST run first. Optional Inference Engine/LLM confidence in [0,1] MAY be blended per policy llm_weight. Acceptance and stop decisions MUST follow EffectivePolicy thresholds (score, oscillation, token, and time budgets). Optional HITL pause is enforced per policy.
3.11.0.4 Reports and Artifacts (§17.4)¶
Implementations SHOULD emit and persist structured records per round:
| Record | Contents |
|---|---|
EvaluationReport | Deterministic checks, scores, notes |
DecisionReport | Scores, rationale, stop reason |
contract_vN.json | Versioned artifact snapshot |
diff_v{N-1}_to_vN.json | Structural diff between rounds |
Use the Activity Service AppendArtifact RPC to persist these records.
3.11.1 Negotiation Room Pattern¶
The Negotiation Room pattern provides a structured multi-agent artifact approval workflow where Producers submit artifacts for evaluation by multiple Critic agents, with a Coordinator aggregating votes and making final decisions.
Roles¶
| Role | Description |
|---|---|
| Producer | Agent that submits an artifact for approval via NegotiationProposal |
| Critic | Agent that evaluates artifacts and submits NegotiationVote with scores and feedback |
| Coordinator | Component (typically the Scheduler) that aggregates votes and issues decisions |
Artifact Types¶
Implementations support the following artifact categories:
REQUIREMENTS: The artifact contains specifications and requirements documents.PLAN: The artifact contains implementation plans and architectural designs.CODE: The artifact contains source code and executable artifacts.DEPLOYMENT: The artifact contains deployment configurations and infrastructure definitions.
Proposal Flow¶
sequenceDiagram
participant P as Producer
participant S as Scheduler/Coordinator
participant C1 as Critic 1
participant C2 as Critic 2
P->>S: SubmitProposal(artifact)
S->>C1: Distribute for review
S->>C2: Distribute for review
C1-->>S: NegotiationVote(score: 8, passed: true)
C2-->>S: NegotiationVote(score: 7, passed: true)
S->>S: Aggregate votes
S-->>P: NegotiationDecision(APPROVED) Proposal Submission¶
A Producer submits artifacts via SubmitProposal RPC with:
message NegotiationProposal {
string artifact_type = 1; // REQUIREMENTS, PLAN, CODE, DEPLOYMENT
string artifact_id = 2; // Unique identifier
bytes artifact = 3; // Binary content
string artifact_content_type = 4; // MIME type (e.g., application/json)
repeated string requested_critics = 5; // Critic agent IDs
string negotiation_room_id = 6; // Session identifier
}
Voting¶
Critics evaluate artifacts and submit votes containing:
| Field | Type | Description |
|---|---|---|
score | float | The critic assigns a numerical score from 0 to 10. A score of 10 indicates excellent quality. |
confidence | float | The critic assigns a confidence level from 0 to 1 representing POMDP uncertainty. |
passed | bool | The critic indicates whether the artifact meets minimum criteria. |
strengths | string[] | The critic lists positive aspects of the artifact. |
weaknesses | string[] | The critic lists areas for improvement. |
recommendations | string[] | The critic lists suggested changes. |
Vote Aggregation¶
The Coordinator computes AggregatedScore from all votes:
# Python SDK example
from sw4rm.voting import VotingAggregator, ConfidenceWeightedAggregator
votes = [vote1, vote2, vote3] # NegotiationVote objects
aggregator = VotingAggregator(ConfidenceWeightedAggregator())
aggregated = aggregator.aggregate(votes)
print(f"Mean score: {aggregated.mean}")
print(f"Weighted mean: {aggregated.weighted_mean}")
print(f"Std deviation: {aggregated.std_dev}")
print(f"Vote count: {aggregated.vote_count}")
Decision Outcomes¶
| Outcome | Description |
|---|---|
APPROVED | The artifact meets all thresholds. The workflow proceeds. |
REVISION_REQUESTED | The artifact requires changes. The Producer should iterate. |
ESCALATED_TO_HITL | Human judgment is required. The system escalates via HITL (Section 3.4). |
Blocking Wait¶
For synchronous workflows, Producers can block until a decision is rendered:
# Wait for decision with timeout
decision = client.wait_for_decision(
artifact_id=artifact_id,
timeout_s=300
)
Policy-Based Auto-Approval (v0.5.0)¶
Implementations SHOULD support configurable policy thresholds for automatic decision outcomes:
Approval Thresholds:
| Threshold | Default | Description |
|---|---|---|
min_score_threshold | 7.0 | Minimum aggregated score for auto-approval |
min_confidence_threshold | 0.7 | Minimum average confidence required |
min_pass_ratio | 0.8 | Minimum proportion of critics that passed |
max_std_dev | 2.0 | Maximum score standard deviation for auto-approval |
The Coordinator SHOULD apply auto-approval when ALL conditions are met:
aggregated_score.weighted_mean >= min_score_threshold- Average confidence across votes >=
min_confidence_threshold - Proportion of votes with
passed=true>=min_pass_ratio aggregated_score.std_dev <= max_std_dev
The Coordinator MUST escalate to HITL when ANY of these conditions are met:
- Any critic vote has
confidence < 0.3(high uncertainty) aggregated_score.std_dev > 3.0(extreme disagreement)- Any critic explicitly requests escalation via recommendations
- Policy timeout expires before sufficient votes are received
Decisions MUST include the policy_version field identifying which policy configuration was applied.
3.11.2 Agent Handoff Protocol¶
The Handoff Protocol enables safe delegation of work between agents when capability requirements change or workload balancing is needed.
Handoff Lifecycle¶
stateDiagram-v2
[*] --> PENDING: RequestHandoff
PENDING --> ACCEPTED: AcceptHandoff
PENDING --> REJECTED: RejectHandoff
PENDING --> EXPIRED: Timeout
ACCEPTED --> COMPLETED: Context transferred
REJECTED --> [*]: Find alternative
EXPIRED --> [*]: Retry or escalate
COMPLETED --> [*]: Success Handoff States¶
| State | Description |
|---|---|
PENDING | The request was submitted. The system awaits a response. |
ACCEPTED | The target agent accepted the handoff. |
REJECTED | The target agent declined the handoff. The response includes a reason. |
COMPLETED | The system executed the handoff. The context was transferred. |
EXPIRED | The timeout was reached before resolution. |
Handoff Request¶
An originating agent initiates handoff via RequestHandoff:
message HandoffRequest {
string request_id = 1; // Unique identifier
string from_agent = 2; // Originating agent ID
string to_agent = 3; // Target agent ID (or empty for routing)
string reason = 4; // Human-readable explanation
bytes context_snapshot = 5; // Serialized execution context
repeated string capabilities_required = 6; // Required capabilities
int32 priority = 7; // Priority level
int64 timeout_ms = 8; // Maximum wait duration
}
Accept/Reject Semantics¶
# Python SDK example - initiating and accepting a handoff
from sw4rm.clients import HandoffClient
from sw4rm.handoff import deserialize_context
from sw4rm.handoff.types import HandoffRequest
client = HandoffClient()
request = HandoffRequest(
from_agent="producer-1",
to_agent="reviewer-1",
reason="Needs security review",
context_snapshot=b"...",
capabilities_required=["security-review"],
)
response = client.request_handoff(request)
if response.accepted and can_handle(request.capabilities_required):
client.accept_handoff(response.handoff_id)
context = deserialize_context(request.context_snapshot)
process_handoff_work(context)
else:
client.reject_handoff(
response.handoff_id,
reason="Missing required capabilities"
)
Context Transfer¶
The context_snapshot field carries serialized state necessary for the receiving agent to continue work:
- The originating agent MUST NOT continue work on transferred context after handoff completion.
- Implementations SHOULD define content types for context snapshots.
- Agents SHOULD call
GetPendingHandoffsperiodically after recovery from failures.
3.11.3 Workflow Orchestration¶
The Workflow Orchestration pattern enables DAG-based multi-agent task coordination where nodes represent agent-executed steps with explicit dependencies.
Workflow Structure¶
graph LR
subgraph "Workflow DAG"
A[Parse Input] --> B[Validate]
B --> C[Transform]
B --> D[Enrich]
C --> E[Aggregate]
D --> E
E --> F[Output]
end Workflow Definition¶
A WorkflowDefinition comprises:
message WorkflowDefinition {
string workflow_id = 1; // Unique identifier
map<string, WorkflowNode> nodes = 2; // Node definitions
map<string, string> metadata = 3; // Workflow-level configuration
}
message WorkflowNode {
string node_id = 1; // Unique within workflow
string agent_id = 2; // Responsible agent
repeated string dependencies = 3; // Must complete before execution
TriggerType trigger_type = 4; // Activation mechanism
map<string, string> input_mapping = 5; // Workflow state -> node input
map<string, string> output_mapping = 6; // Node output -> workflow state
}
Node Status Transitions¶
| Status | Description |
|---|---|
PENDING | The node waits for dependencies to complete. |
READY | All dependencies are satisfied. The node is eligible for execution. |
RUNNING | The node is currently executing. |
COMPLETED | The node finished successfully. |
FAILED | The node encountered an error. |
SKIPPED | The node was bypassed due to conditional logic. |
Constraint: Nodes MUST NOT execute until all nodes in their dependencies set have reached COMPLETED status.
Trigger Types¶
| Type | Description |
|---|---|
EVENT | The node executes when an external event or message arrives. |
SCHEDULE | The node executes on a time-based schedule (cron-like). |
MANUAL | The node executes when a user explicitly triggers it. |
DEPENDENCY | The node executes when all dependencies complete. This is the default. |
Workflow Operations¶
# Python SDK example
from sw4rm.workflow import WorkflowBuilder, WorkflowEngine, TriggerType
builder = WorkflowBuilder("data_pipeline")
builder.add_node("fetch", "fetcher_agent", TriggerType.EVENT)
builder.add_node("transform", "transformer_agent")
builder.add_dependency("fetch", "transform")
workflow = builder.build()
def executor(node, input_data):
# Replace with real execution logic
return {"node": node.node_id, "output": input_data}
engine = WorkflowEngine(workflow, executor=executor)
state = engine.execute(initial_data={"input_file": "/data/source.csv"})
for node_id, node_state in state.node_states.items():
print(f"{node_id}: {node_state.status}")
Input/Output Mapping¶
Data flows between nodes through shared workflow_data:
- Before execution: The system extracts values from
workflow_dataaccording toinput_mapping. - After completion: The system writes node outputs to
workflow_dataaccording tooutput_mapping.
Example:
nodes:
parse:
agent_id: "parser-agent"
output_mapping:
parsed_records: "records" # Node output -> workflow state key
transform:
agent_id: "transform-agent"
dependencies: ["parse"]
input_mapping:
input_records: "records" # Workflow state key -> node input
output_mapping:
transformed: "results"
DAG Validation and Cycle Detection (v0.5.0)¶
Implementations MUST validate workflow definitions to ensure they form valid Directed Acyclic Graphs (DAGs). Validation MUST occur during CreateWorkflow before the workflow is persisted.
Cycle Detection:
- Implementations MUST detect cycles in the dependency graph before accepting a workflow definition.
- When a cycle is detected,
CreateWorkflowMUST fail witherror_code=workflow_cycle_detected. - The error response SHOULD include the nodes involved in the cycle to aid debugging.
Validation Algorithm: Implementations SHOULD use topological sort (Kahn's algorithm or DFS-based) to detect cycles. The algorithm MUST complete in O(V + E) time where V is the number of nodes and E is the number of dependency edges.
Additional Validation Rules:
- All
node_idvalues referenced independenciesMUST exist in the workflow'snodesmap. - Self-referential dependencies (a node depending on itself) MUST be rejected.
- Implementations SHOULD warn (but MAY accept) workflows with unreachable nodes.
- A valid workflow MUST have at least one root node (node with empty
dependencies).
3.11.4 Three-ID Model¶
The SW4RM messaging model uses three distinct identifiers to enable reliable message processing with retry safety and correlation tracking.
Identifier Types¶
| Identifier | Purpose | Scope | Mutability |
|---|---|---|---|
message_id | Unique per transmission attempt | Per-attempt | New on each retry |
correlation_id | Links request-response pairs | Conversation/workflow | Stable across related messages |
idempotency_token | Enables exactly-once processing | Retry window | Stable across retries |
Message ID¶
- Format: The system generates a UUIDv4 per transmission.
- Purpose: The ID uniquely identifies each message attempt.
- Behavior: The system generates a new
message_idon every send, including retries.
Correlation ID¶
- Format: The caller provides a string, typically a UUIDv4.
- Purpose: The ID links related messages in a conversation or workflow.
- Behavior: Response messages copy the
correlation_idfrom the request.
# Request
request = build_envelope(
message_type=MessageType.DATA,
correlation_id="workflow-123",
payload=task_data
)
# Response uses same correlation_id
response = build_envelope(
message_type=MessageType.DATA,
correlation_id="workflow-123", # Same as request
payload=result_data
)
Idempotency Token¶
- Format: The caller provides a deterministic string.
- Purpose: The token enables safe retry of operations.
- Behavior: The Router deduplicates messages with the same token within the deduplication window.
# Generate deterministic token for retry safety
token = f"{agent_id}:{task_id}:{content_hash}"
envelope = build_envelope(
message_type=MessageType.DATA,
idempotency_token=token, # Same token on retries
payload=data
)
Deduplication Window¶
- The default window is 3600 seconds (1 hour).
- Messages with identical
idempotency_tokenwithin the window returnDUPLICATE_DETECTED. - Implementations SHOULD configure the window based on expected retry patterns.
See Also¶
- Protocol Specification - Core protocol concepts
- Services Reference - Complete service API reference
- ACK Lifecycle - Acknowledgment handling patterns
- Messages - Message type specifications