Coordination
Workflow Tracing
Distributed tracing headers, DAG visualization, and real-time SSE streaming for multi-agent workflows.
See every agent call, every dependency, every millisecond -- across your entire multi-agent workflow.
When Agent A calls Agent B which calls Agent C, you need to understand the full execution graph: what ran, in what order, how long each step took, and where failures occurred. AgentField propagates tracing headers on every cross-agent call, builds a DAG of the execution graph, and streams events in real-time via SSE.
from agentfield import Agent
app = Agent(node_id="orchestrator")
@app.reasoner()
async def analyze_portfolio(portfolio: dict) -> dict:
# Each call propagates tracing headers automatically
# The control plane builds a DAG from the execution tree
risk = await app.call("risk-analyzer.assess", input=portfolio)
compliance = await app.call("compliance-checker.verify", input=portfolio)
allocation = await app.call("allocation-optimizer.optimize", input={
"portfolio": portfolio,
"risk": risk,
"compliance": compliance,
})
return {"risk": risk, "compliance": compliance, "allocation": allocation}
# After execution, query the workflow DAG via the REST API
# GET /api/ui/v1/workflows/{workflowId}/dag
import httpx
async def inspect_workflow(workflow_id: str):
async with httpx.AsyncClient() as client:
resp = await client.get(f"http://localhost:8080/api/ui/v1/workflows/{workflow_id}/dag")
dag = resp.json()
# The response is a nested tree rooted at dag["dag"] with children
def walk(node, indent=0):
print(f"{' ' * indent}{node['agent_node_id']}.{node['reasoner_id']}: {node['status']} ({node.get('duration_ms', '...')}ms)")
for child in node.get("children", []):
walk(child, indent + 1)
walk(dag["dag"])# Query the full execution DAG for a workflow
curl http://localhost:8080/api/ui/v1/workflows/wf_abc123/dag | jq .
# {
# "root_workflow_id": "wf_abc123",
# "workflow_status": "succeeded",
# "workflow_name": "analyze_portfolio",
# "total_nodes": 4,
# "max_depth": 1,
# "dag": {
# "execution_id": "exec_1", "agent_node_id": "orchestrator", "reasoner_id": "analyze_portfolio",
# "status": "succeeded", "workflow_depth": 0, "children": [
# {"execution_id": "exec_2", "agent_node_id": "risk-analyzer", "reasoner_id": "assess", "status": "succeeded", "workflow_depth": 1, "children": []},
# {"execution_id": "exec_3", "agent_node_id": "compliance-checker", "reasoner_id": "verify", "status": "succeeded", "workflow_depth": 1, "children": []},
# {"execution_id": "exec_4", "agent_node_id": "allocation-optimizer", "reasoner_id": "optimize", "status": "succeeded", "workflow_depth": 1, "children": []}
# ]
# }
# }
# Stream real-time execution events via SSE (plain data: frames, no event: field)
curl -N http://localhost:8080/api/ui/v1/executions/events
# data: {"type":"connected","message":"Execution events stream connected","timestamp":"2026-03-24T10:00:00Z"}
#
# data: {"type":"execution.started","execution_id":"exec_1","workflow_id":"wf_abc123","agent_node_id":"orchestrator"}
#
# data: {"type":"execution.completed","execution_id":"exec_2","status":"succeeded","duration_ms":3200}
# Inspect tracing headers on a specific execution
curl http://localhost:8080/api/v1/executions/exec_2 | jq .trace_context
# {
# "workflow_id": "wf_abc123",
# "correlation_id": "corr_xyz",
# "parent_execution_id": "exec_1",
# "caller_node_id": "orchestrator",
# "depth": 1,
# "span_id": "span_abc"
# }What just happened
- Every cross-agent call carried tracing context automatically
- The control plane reconstructed the workflow as a queryable DAG
- Real-time events exposed execution start, completion, and depth without extra instrumentation
Example trace context:
{
"workflow_id": "wf_abc123",
"execution_id": "exec_2",
"parent_execution_id": "exec_1",
"caller_node_id": "orchestrator",
"depth": 1
}