Skip to content

6.16. Workflow Client

The Workflow Client orchestrates DAG-based multi-agent workflows. Use it to:

  • Define workflows as directed acyclic graphs (DAGs) of agent nodes.
  • Start workflow instances and track node-level execution state.
  • Cancel workflows or update node status during execution.
  • Validate dependencies and detect cycles before execution.

Current SDKs provide in-memory implementations. JavaScript/TypeScript and Rust store definitions and instances locally, while Python's WorkflowClient is an alias for the in-process WorkflowEngine. A gRPC WorkflowService is defined in protos/workflow.proto but is not yet wired into SDK clients.

6.16.1. Service Overview

The service exposes four RPCs:

  • CreateWorkflow(CreateWorkflowRequest) -> CreateWorkflowResponse
  • StartWorkflow(StartWorkflowRequest) -> StartWorkflowResponse
  • GetWorkflowState(GetWorkflowStateRequest) -> GetWorkflowStateResponse
  • ResumeWorkflow(ResumeWorkflowRequest) -> ResumeWorkflowResponse

Field names are snake_case in Python/Rust and camelCase in JS/TS.

NodeStatus values

Value Description
PENDING Node waiting for dependencies.
READY Node ready to execute (JS/TS + Rust).
RUNNING Node currently executing.
COMPLETED Node finished successfully.
FAILED Node encountered an error.
SKIPPED Node skipped due to conditional logic.

TriggerType values

Value Description
EVENT Triggered by specific events.
SCHEDULE Triggered on a schedule (cron-like).
MANUAL Triggered by explicit user action.
DEPENDENCY Triggered by dependency completion (JS/TS + Rust).

WorkflowDefinition fields

Field Type Description
workflow_id string Unique workflow identifier.
nodes map Node definitions keyed by node_id.
created_at timestamp Creation time (auto-set if omitted).
metadata map Workflow-level metadata.

WorkflowState / WorkflowInstance fields

Field Type Description
workflow_id string Workflow identifier.
node_states map Node execution status map.
workflow_data string Shared workflow data (JSON string).
started_at timestamp Start time.
completed_at timestamp Completion time.
metadata map Runtime metadata.

6.16.2. Constructors

Python

WorkflowClient(workflow: WorkflowDefinition, executor: Callable[[WorkflowNode, dict], dict] | None = None)

  • workflow: The workflow definition to execute in-process.
  • executor: Optional callable to execute nodes; if omitted, nodes are marked completed without execution.

JavaScript/TypeScript

new WorkflowClient()

  • In-memory client that stores definitions and instances in-process.

Rust

WorkflowClient::new() -> WorkflowClient

  • In-memory client backed by an internal workflow store.

6.16.3. Key Methods

Python's WorkflowClient is an engine for a single workflow definition, while JS/TS and Rust expose a registry of definitions and instances. The sections below call out the available methods per SDK.

Definition registry (JavaScript/TypeScript + Rust)

create_workflow / createWorkflow

JavaScript/TypeScript createWorkflow(definition: WorkflowDefinition): Promise<string>

Rust create_workflow(&self, definition: WorkflowDefinition) -> Result<String>

start_workflow / startWorkflow

JavaScript/TypeScript startWorkflow(workflowId: string, initialData?: string, metadata?: Record<string, string>): Promise<WorkflowInstance>

Rust start_workflow(&self, workflow_id: &str) -> Result<WorkflowInstance>

get_workflow_status / getWorkflowStatus

JavaScript/TypeScript getWorkflowStatus(instanceId: string): Promise<WorkflowInstance>

Rust get_workflow_status(&self, instance_id: &str) -> Result<WorkflowInstance>

cancel_workflow / cancelWorkflow

JavaScript/TypeScript cancelWorkflow(instanceId: string): Promise<void>

Rust cancel_workflow(&self, instance_id: &str) -> Result<()>

update_node_state / updateNodeState

JavaScript/TypeScript updateNodeState(instanceId: string, nodeId: string, status: NodeStatus, output?: string, error?: string): Promise<void>

Rust update_node_state(&self, instance_id: &str, node_id: &str, status: NodeStatus, output: Option<String>, error: Option<String>) -> Result<()>

get_workflow_definition / getWorkflowDefinition

JavaScript/TypeScript getWorkflowDefinition(workflowId: string): Promise<WorkflowDefinition | null>

Rust get_workflow_definition(&self, workflow_id: &str) -> Result<Option<WorkflowDefinition>>

list_workflows / listWorkflowInstances

JavaScript/TypeScript listWorkflowInstances(workflowId: string): Promise<WorkflowInstance[]>

Rust list_workflows(&self) -> Result<Vec<String>>

Execution engine (Python)

execute

Python execute(initial_data: dict[str, Any] | None = None, checkpoint_interval: int | None = None) -> WorkflowState

resume_from_checkpoint

Python resume_from_checkpoint(state: WorkflowState) -> WorkflowState

get_execution_plan

Python get_execution_plan() -> list[set[str]]

For a Python-focused API reference and engine behavior details, see the Workflow Engine documentation.

6.16.4. WorkflowBuilder (Python)

WorkflowBuilder provides a fluent API for assembling WorkflowDefinition DAGs with validation. It is available from sw4rm.workflow.

Constructor

WorkflowBuilder(workflow_id: str)

  • workflow_id: Non-empty workflow identifier.

Raises WorkflowBuilderError if the workflow ID is empty.

Node and dependency management

add_node(node_id: str, agent_id: str, trigger_type: TriggerType = TriggerType.EVENT) -> WorkflowBuilder

  • Adds a node with optional trigger type (default: EVENT).
  • Raises WorkflowBuilderError for empty IDs or duplicate node_id.

add_dependency(from_node: str, to_node: str) -> WorkflowBuilder

  • Adds a dependency edge so to_node waits for from_node.
  • Raises WorkflowBuilderError if either node is missing or identical.

add_dependencies(from_nodes: list[str], to_node: str) -> WorkflowBuilder

  • Convenience wrapper to add multiple dependencies to one node.

remove_node(node_id: str) -> WorkflowBuilder

  • Removes a node and clears any dependencies referencing it.
  • Raises WorkflowBuilderError if the node does not exist.

Triggers, mappings, and metadata

set_trigger(node_id: str, trigger_type: TriggerType) -> WorkflowBuilder

set_input_mapping(node_id: str, input_mapping: dict[str, str]) -> WorkflowBuilder

set_output_mapping(node_id: str, output_mapping: dict[str, str]) -> WorkflowBuilder

set_node_metadata(node_id: str, metadata: dict[str, Any]) -> WorkflowBuilder

set_workflow_metadata(metadata: dict[str, Any]) -> WorkflowBuilder

All mapping/metadata setters raise WorkflowBuilderError if the node is missing.

Build and inspection

build(validate: bool = True) -> WorkflowDefinition

  • Returns an immutable WorkflowDefinition.
  • When validate=True, the builder constructs a WorkflowEngine to detect cycles.
  • Validation failures raise WorkflowBuilderError; the underlying cause is a CycleDetectedError with the cycle path in its message.

clone() -> WorkflowBuilder

  • Returns a deep copy of the builder (nodes, metadata, and mappings).

get_node_count() -> int

has_node(node_id: str) -> bool

get_node_dependencies(node_id: str) -> set[str]

get_node_dependencies raises WorkflowBuilderError if the node does not exist.

6.16.5. Usage Examples

from sw4rm.clients import WorkflowClient
from sw4rm.workflow import WorkflowBuilder, TriggerType

builder = WorkflowBuilder("review_flow")
builder.add_node("draft", "writer", TriggerType.MANUAL)
builder.add_node("review", "critic")
builder.add_dependency("draft", "review")

workflow = builder.build()
engine = WorkflowClient(workflow)

state = engine.execute(initial_data={"doc_id": "doc-1"})
for node_id, node_state in state.node_states.items():
    print(node_id, node_state.status)
import { WorkflowClient, TriggerType, NodeStatus } from '@sw4rm/js-sdk';

const client = new WorkflowClient();

const workflowId = await client.createWorkflow({
  workflowId: 'review-flow',
  nodes: {
    draft: {
      nodeId: 'draft',
      agentId: 'writer',
      dependencies: [],
      triggerType: TriggerType.MANUAL,
      inputMapping: {},
      outputMapping: { draft: 'draft_text' },
      metadata: {},
    },
    review: {
      nodeId: 'review',
      agentId: 'critic',
      dependencies: ['draft'],
      triggerType: TriggerType.DEPENDENCY,
      inputMapping: { draft: 'draft_text' },
      outputMapping: { verdict: 'review_verdict' },
      metadata: {},
    },
  },
  metadata: {},
});

const instance = await client.startWorkflow(
  workflowId,
  JSON.stringify({ doc_id: 'doc-1' })
);

await client.updateNodeState(
  instance.instanceId,
  'draft',
  NodeStatus.COMPLETED,
  JSON.stringify({ draft: 'ready' })
);

const status = await client.getWorkflowStatus(instance.instanceId);
console.log(status.status, status.nodeStates.review.status);
use std::collections::HashMap;
use sw4rm_sdk::clients::workflow::{NodeStatus, TriggerType, WorkflowClient, WorkflowDefinition, WorkflowNode};

fn main() -> sw4rm_sdk::Result<()> {
    let client = WorkflowClient::new();

    let draft = WorkflowNode::new("draft".to_string(), "writer".to_string())?
        .with_trigger(TriggerType::Manual);
    let review = WorkflowNode::new("review".to_string(), "critic".to_string())?
        .with_dependency("draft".to_string())
        .with_trigger(TriggerType::Dependency);

    let mut nodes = HashMap::new();
    nodes.insert("draft".to_string(), draft);
    nodes.insert("review".to_string(), review);

    let definition = WorkflowDefinition::new("review-flow".to_string(), nodes)?;
    let workflow_id = client.create_workflow(definition)?;
    let instance = client.start_workflow(&workflow_id)?;

    client.update_node_state(
        &instance.instance_id,
        "draft",
        NodeStatus::Completed,
        Some("{\"draft\":\"ready\"}".to_string()),
        None,
    )?;

    Ok(())
}

Working Examples

For complete runnable examples demonstrating Workflow usage:

6.16.5. Error Handling

  • JavaScript/TypeScript throws WorkflowValidationError for invalid definitions and WorkflowCycleError for cycles; missing workflows or instances raise WorkflowValidationError.
  • Rust returns Error::Config for invalid definitions, missing workflows, and missing instances.
  • Python raises WorkflowValidationError or CycleDetectedError during DAG validation in the engine.