Skip to content
AgentField
Coordination

Orchestration Patterns

Multi-agent patterns for parallel, sequential, and fan-out/fan-in execution using cross-agent calls and shared memory.

Three orchestration patterns: sequential, parallel, fan-in

Compose agents into parallel, sequential, and fan-out/fan-in workflows using the primitives you already have.

AgentField does not require a separate orchestration DSL or workflow engine. You orchestrate with normal language primitives, then the control plane turns those calls into a traced workflow.

That means you keep native asyncio, Promise.all, or goroutines, but still get a workflow DAG, propagated context, and execution visibility across the whole fan-out/fan-in flow.

@app.reasoner()
async def underwrite_application(application: dict) -> dict:
    # Fan-out: run three checks in parallel — each is a separate agent
    credit, fraud, compliance = await asyncio.gather(
        app.call("credit-scorer.score", ssn=application["ssn"]),
        app.call("fraud-detector.screen", applicant=application),
        app.call("compliance.check_sanctions", name=application["name"]),
    )
    # Fan-in: aggregate results for final decision
    if fraud["risk"] > 0.7 or not compliance["cleared"]:
        return {"decision": "denied", "reason": "risk threshold exceeded"}
    return {"decision": "approved", "credit_score": credit["score"]}
    # Every call is a node in the execution DAG — full observability for free
agent.reasoner('underwriteApplication', async (ctx) => {
  // Fan-out: run three checks in parallel — each is a separate agent
  const [credit, fraud, compliance] = await Promise.all([
    agent.call('credit-scorer.score', { ssn: ctx.input.ssn }),
    agent.call('fraud-detector.screen', { applicant: ctx.input }),
    agent.call('compliance.checkSanctions', { name: ctx.input.name }),
  ]);
  // Fan-in: aggregate results for final decision
  if (fraud.risk > 0.7 || !compliance.cleared) {
    return { decision: 'denied', reason: 'risk threshold exceeded' };
  }
  return { decision: 'approved', creditScore: credit.score };
});
a.RegisterReasoner("underwrite_application", func(ctx context.Context, input map[string]any) (any, error) {
    // Fan-out: run three checks in parallel — each is a separate agent
    var credit, fraud, compliance map[string]any
    var wg sync.WaitGroup
    wg.Add(3)
    go func() { defer wg.Done(); credit, _ = a.Call(ctx, "credit-scorer.score", input) }()
    go func() { defer wg.Done(); fraud, _ = a.Call(ctx, "fraud-detector.screen", input) }()
    go func() { defer wg.Done(); compliance, _ = a.Call(ctx, "compliance.check_sanctions", input) }()
    wg.Wait()
    // Fan-in: aggregate results — every call tracked in the execution DAG
    if fraud["risk"].(float64) > 0.7 || !compliance["cleared"].(bool) {
        return map[string]any{"decision": "denied"}, nil
    }
    return map[string]any{"decision": "approved", "credit_score": credit["score"]}, nil
})

What just happened

  • The orchestration logic stayed inside normal application code
  • Parallel child calls became tracked DAG nodes automatically
  • The control plane recorded the fan-out and fan-in structure without a separate workflow definition

Example DAG shape:

{
  "target": "underwriter.underwrite_application",
  "children": [
    { "target": "credit-scorer.score", "status": "completed" },
    { "target": "fraud-detector.screen", "status": "completed" },
    { "target": "compliance.check_sanctions", "status": "completed" }
  ],
  "result": { "decision": "approved" }
}