Workflow API

Automatic workflow tracking and execution context management

The Workflow API provides automatic execution tracking, DAG (Directed Acyclic Graph) building, and parent-child relationship management for all reasoner and skill calls. Every execution is tracked with metadata, timing information, and hierarchical relationships.

Automatic Workflow Tracking

All reasoners decorated with @app.reasoner automatically participate in workflow tracking. No additional configuration required.

from agentfield import Agent

app = Agent(node_id="workflow_agent")

@app.reasoner
async def parent_task(data: str) -> dict:
    # This creates a root execution node
    result1 = await app.call("agent1.process", data=data)

    # This creates a child execution node
    result2 = await app.call("agent2.analyze", result=result1)

    return {"result1": result1, "result2": result2}

Workflow DAG Generated:

{
  "workflow_id": "wf-abc123",
  "nodes": [
    {
      "execution_id": "exec-parent",
      "agent_node_id": "workflow_agent",
      "reasoner_name": "parent_task",
      "status": "completed",
      "duration_ms": 2500
    },
    {
      "execution_id": "exec-child1",
      "agent_node_id": "agent1",
      "reasoner_name": "process",
      "parent_execution_id": "exec-parent",
      "status": "completed",
      "duration_ms": 1200
    },
    {
      "execution_id": "exec-child2",
      "agent_node_id": "agent2",
      "reasoner_name": "analyze",
      "parent_execution_id": "exec-parent",
      "status": "completed",
      "duration_ms": 800
    }
  ],
  "edges": [
    { "from": "exec-parent", "to": "exec-child1" },
    { "from": "exec-parent", "to": "exec-child2" }
  ]
}

Workflow tracking is enabled by default for all reasoners and cannot be disabled in the current Python SDK.

Execution Context

The ExecutionContext class carries workflow metadata through the execution chain. It's automatically created and propagated for all reasoner calls.

Context Properties

from agentfield.execution_context import ExecutionContext

@app.reasoner
async def context_aware(
    data: str,
    execution_context: ExecutionContext = None
) -> dict:
    if execution_context:
        return {
            "execution_id": execution_context.execution_id,
            "workflow_id": execution_context.workflow_id,
            "parent_execution_id": execution_context.parent_execution_id,
            "agent_node_id": execution_context.agent_node_id,
            "reasoner_name": execution_context.reasoner_name
        }
    return {"error": "No execution context"}

Key Properties:

  • execution_id - Unique identifier for this execution
  • workflow_id - Identifier for the entire workflow
  • parent_execution_id - Parent execution (if this is a child call)
  • agent_node_id - Agent that owns this execution
  • reasoner_name - Name of the reasoner being executed

Parent-Child Relationships

Cross-agent calls automatically create parent-child relationships in the workflow DAG.

@app.reasoner
async def orchestrator(ticket_text: str) -> dict:
    """Parent task that coordinates multiple agents."""

    # Child execution 1
    sentiment = await app.call(
        "sentiment_agent.analyze",
        text=ticket_text
    )

    # Child execution 2
    priority = await app.call(
        "priority_agent.classify",
        ticket_text=ticket_text
    )

    # Child execution 3 (depends on previous results)
    routing = await app.call(
        "routing_agent.route",
        sentiment=sentiment,
        priority=priority
    )

    return {
        "sentiment": sentiment,
        "priority": priority,
        "routing": routing
    }

Resulting DAG:

orchestrator (parent)
├── sentiment_agent.analyze (child 1)
├── priority_agent.classify (child 2)
└── routing_agent.route (child 3)

Workflow Visualization

The Agentfield UI automatically visualizes workflow DAGs showing:

  • Execution flow and dependencies
  • Timing information for each node
  • Success/failure status
  • Input/output data for each execution
@app.reasoner
async def sequential_workflow(data: str) -> dict:
    # Step 1
    processed = await app.call(
        "processor.process",
        data=data
    )

    # Step 2 (depends on step 1)
    analyzed = await app.call(
        "analyzer.analyze",
        data=processed
    )

    # Step 3 (depends on step 2)
    final = await app.call(
        "finalizer.finalize",
        data=analyzed
    )

    return final

DAG: processor → analyzer → finalizer

@app.reasoner
async def parallel_workflow(data: str) -> dict:
    # Execute in parallel (no dependencies)
    import asyncio

    results = await asyncio.gather(
        app.call("agent1.process", data=data),
        app.call("agent2.process", data=data),
        app.call("agent3.process", data=data)
    )

    return {"results": results}

DAG: Three parallel branches from root

@app.reasoner
async def complex_workflow(data: str) -> dict:
    # Phase 1: Parallel processing
    import asyncio
    phase1 = await asyncio.gather(
        app.call("agent1.extract", data=data),
        app.call("agent2.extract", data=data)
    )

    # Phase 2: Combine results
    combined = await app.call(
        "combiner.combine",
        results=phase1
    )

    # Phase 3: Final analysis
    final = await app.call(
        "analyzer.analyze",
        data=combined
    )

    return final

DAG: Diamond pattern with parallel start, merge, then final step

Execution Metadata

Every execution automatically tracks:

  • Timing: Start time, end time, duration in milliseconds
  • Status: queued, running, completed, failed
  • Input/Output: Function arguments and return values
  • Errors: Exception messages and stack traces (if failed)
  • Context: Workflow ID, parent relationships, agent information
@app.reasoner
async def tracked_execution(data: str) -> dict:
    # Execution metadata automatically captured:
    # - Start timestamp
    # - Input: {"data": "..."}
    # - Agent: workflow_agent
    # - Reasoner: tracked_execution

    result = await app.ai(user=data)

    # Execution metadata automatically captured:
    # - End timestamp
    # - Duration: calculated automatically
    # - Output: result
    # - Status: completed

    return result

Workflow Tracking Behavior

All @app.reasoner executions participate in workflow tracking, DAG building, and execution context propagation. There is no decorator flag to disable this behavior in the Python SDK.

Cross-Agent Workflow Coordination

Workflows can span multiple agents, with the Agentfield server coordinating execution and maintaining the complete DAG.

# Agent 1: Data Collector
app1 = Agent(node_id="collector")

@app1.reasoner
async def collect_data(source: str) -> dict:
    # Collect data from source
    data = await fetch_from_source(source)

    # Pass to processor agent
    processed = await app1.call(
        "processor.process",
        data=data
    )

    return processed

# Agent 2: Data Processor
app2 = Agent(node_id="processor")

@app2.reasoner
async def process(data: dict) -> dict:
    # Process the data
    result = await app2.ai(user=f"Process: {data}")

    # Pass to analyzer agent
    analyzed = await app2.call(
        "analyzer.analyze",
        data=result
    )

    return analyzed

# Agent 3: Data Analyzer
app3 = Agent(node_id="analyzer")

@app3.reasoner
async def analyze(data: dict) -> dict:
    # Final analysis
    return await app3.ai(user=f"Analyze: {data}")

Complete Workflow DAG:

collector.collect_data (Agent 1)
└── processor.process (Agent 2)
    └── analyzer.analyze (Agent 3)

Workflow Querying

Query workflow execution history via the Agentfield REST API:

# Get workflow details
curl http://localhost:8080/api/v1/workflows/wf-abc123

# Get execution details
curl http://localhost:8080/api/v1/executions/exec-xyz789

# List all executions in a workflow
curl http://localhost:8080/api/v1/workflows/wf-abc123/executions

Real-Time Workflow Updates

Subscribe to workflow events via Server-Sent Events (SSE):

curl -N -H "Accept: text/event-stream" \
  http://localhost:8080/api/v1/workflows/wf-abc123/events

Event Stream:

event: workflow.started
data: {"workflow_id": "wf-abc123", "status": "running"}

event: execution.started
data: {"execution_id": "exec-1", "reasoner": "process"}

event: execution.completed
data: {"execution_id": "exec-1", "status": "completed", "duration_ms": 1200}

event: workflow.completed
data: {"workflow_id": "wf-abc123", "status": "completed", "total_duration_ms": 2500}

Best Practices

1. Use Descriptive Reasoner Names

# Good: Clear, descriptive names
@app.reasoner
async def analyze_customer_sentiment(message: str) -> dict:
    pass

# Avoid: Generic names
@app.reasoner
async def process(data: str) -> dict:
    pass

2. Structure Complex Workflows

# Break complex workflows into logical steps
@app.reasoner
async def ticket_analysis_workflow(ticket: dict) -> dict:
    # Step 1: Extract information
    extracted = await app.call(
        "extractor.extract_ticket_info",
        ticket=ticket
    )

    # Step 2: Classify priority
    priority = await app.call(
        "classifier.classify_priority",
        info=extracted
    )

    # Step 3: Route to team
    routing = await app.call(
        "router.route_to_team",
        priority=priority,
        info=extracted
    )

    return {
        "extracted": extracted,
        "priority": priority,
        "routing": routing
    }

3. Handle Errors Gracefully

@app.reasoner
async def robust_workflow(data: str) -> dict:
    try:
        result = await app.call("agent1.process", data=data)
        return {"status": "success", "result": result}
    except Exception as e:
        # Error automatically tracked in workflow DAG
        return {"status": "error", "message": str(e)}

4. Use Parallel Execution When Possible

import asyncio

@app.reasoner
async def parallel_analysis(text: str) -> dict:
    # Execute independent tasks in parallel
    sentiment, entities, summary = await asyncio.gather(
        app.call("sentiment_agent.analyze", text=text),
        app.call("entity_agent.extract", text=text),
        app.call("summary_agent.summarize", text=text)
    )

    return {
        "sentiment": sentiment,
        "entities": entities,
        "summary": summary
    }