Cross-Agent Communication

Multi-agent coordination and communication in Agentfield

Cross-Agent Communication

Coordinate distributed agents with automatic context propagation

Building a single AI agent is straightforward. Building a system where multiple agents coordinate across process boundaries? That's where production gets hard.

You need agents to discover each other without hardcoded URLs. You need execution context to flow automatically. You need to track what happened when things fail. You need observability across the entire workflow.

Traditional frameworks leave this as "an exercise for the developer." You end up building service meshes, implementing distributed tracing, writing state synchronization code. Weeks of infrastructure work before you write business logic.

Agentfield makes multi-agent coordination infrastructure. app.call() looks like calling a local function. Behind it, the control plane handles service discovery, context propagation, workflow tracking, and distributed state management.

What You'd Otherwise Build

Traditional Multi-Agent

What you build:

  • Service registry (Consul, etcd)
  • API gateway for routing
  • Context propagation headers
  • Distributed tracing (Jaeger, Zipkin)
  • State synchronization (Redis, database)
  • Retry and circuit breaker logic
  • Workflow tracking system
  • Observability instrumentation

Then you write business logic.

Agentfield Multi-Agent

What you write:

result = await app.call(
    "other-agent.function",
    param=value
)

Agentfield provides:

  • ✓ Service discovery
  • ✓ Automatic routing
  • ✓ Context propagation
  • ✓ Distributed tracing
  • ✓ Shared memory
  • ✓ Retry handling
  • ✓ Workflow DAGs
  • ✓ Full observability

The Distributed Systems Problem

Here's what happens in production when you need multiple agents:

Scenario: A customer submits a support ticket. Your system needs to:

  1. Analyze sentiment (Sentiment Agent)
  2. Check if this customer has had issues before (Customer History Agent)
  3. Determine priority and routing (Triage Agent)
  4. If high priority, escalate (Escalation Agent)
  5. Send notifications (Notification Agent)

Each agent is owned by a different team. Each deploys independently. Each scales separately.

Traditional approach requires you to build:

  • Service registry (how does Triage Agent find Sentiment Agent?)
  • Context passing (how does Escalation Agent know which customer this is for?)
  • Workflow tracking (how do you debug when step 3 fails?)
  • State coordination (how do agents share data without a shared database?)
  • Error handling (what happens when Notification Agent is down?)

Agentfield's approach: Write await app.call("sentiment-agent.analyze"). The control plane handles everything else.

How It Works: Microservices for AI

Agentfield treats each agent as an independent microservice. When you call another agent, you're making a service-to-service call through the control plane.

# Agent 1: Support Triage (Team A owns this)
from agentfield import Agent
from pydantic import BaseModel

app = Agent(node_id="support-triage")

class TriageDecision(BaseModel):
    priority: str  # "low", "medium", "high", "critical"
    category: str
    escalate: bool
    reasoning: str

@app.reasoner()
async def triage_ticket(ticket_id: str, message: str, customer_id: str) -> dict:
    """
    Triages a support ticket using multiple specialized agents.
    Each agent is a separate microservice, possibly owned by different teams.
    """

    # Call Sentiment Agent (Team B's service)
    sentiment = await app.call(
        "sentiment-agent.analyze_text",
        text=message
    )

    # Call Customer History Agent (Team C's service)
    history = await app.call(
        "customer-history.get_recent_issues",
        customer_id=customer_id
    )

    # Use AI to make triage decision based on data from other agents
    decision = await app.ai(
        system="You are a support triage expert.",
        user=f"""
        Ticket: {message}
        Sentiment: {sentiment['sentiment']} (confidence: {sentiment['confidence']})
        Recent issues: {history['issue_count']} in last 30 days
        """,
        schema=TriageDecision
    )

    # If escalation needed, call Escalation Agent (Team D's service)
    if decision.escalate:
        escalation = await app.call(
            "escalation-agent.create_case",
            ticket_id=ticket_id,
            priority=decision.priority,
            reasoning=decision.reasoning
        )

        app.note(f"⚠️ Escalated to case {escalation['case_id']}", tags=["escalation"])

    # Store decision in shared memory (other agents can access this)
    await app.memory.set(f"ticket_{ticket_id}_decision", decision.dict())

    return {
        'ticket_id': ticket_id,
        'decision': decision.dict(),
        'sentiment': sentiment,
        'history_summary': history
    }
# Agent 2: Escalation Handler (Team D owns this, different codebase)
from agentfield import Agent
from pydantic import BaseModel

app = Agent(node_id="escalation-agent")

class EscalationCase(BaseModel):
    case_id: str
    assigned_to: str
    sla_hours: int
    notification_sent: bool

@app.reasoner()
async def create_case(ticket_id: str, priority: str, reasoning: str) -> dict:
    """
    Creates an escalation case. Called by other agents.
    Context flows automatically from the calling agent.
    """

    # Access shared memory from the workflow (set by triage agent)
    ticket_decision = await app.memory.get(f"ticket_{ticket_id}_decision")

    # Use AI to assign the case
    assignment = await app.ai(
        system="Assign escalations based on priority and team availability.",
        user=f"Priority: {priority}, Category: {ticket_decision['category']}",
        schema=dict
    )

    case = EscalationCase(
        case_id=f"ESC-{ticket_id}",
        assigned_to=assignment['team_member'],
        sla_hours=2 if priority == "critical" else 24,
        notification_sent=False
    )

    # Call Notification Agent (Team E's service)
    await app.call(
        "notification-agent.send_alert",
        channel="#escalations",
        case_id=case.case_id,
        assigned_to=case.assigned_to
    )

    case.notification_sent = True

    app.note(f"✅ Case {case.case_id} assigned to {case.assigned_to}", tags=["assignment"])

    return case.dict()

What just happened?

  1. Support Triage called Sentiment Agent (different team, different deployment)
  2. Called Customer History Agent (another team, another deployment)
  3. Made a decision using AI
  4. Called Escalation Agent (yet another team)
  5. Escalation Agent accessed shared memory from the workflow
  6. Called Notification Agent (fifth team)

Agentfield automatically:

  • Discovered where each agent lives (no hardcoded URLs)
  • Propagated workflow context (all agents know they're part of the same workflow)
  • Built a workflow DAG (you can visualize the entire flow)
  • Tracked execution (inputs, outputs, timing for each step)
  • Handled errors (if Notification Agent fails, you get the full context)

What Agentfield Handles Automatically

When you use app.call(), the control plane provides production infrastructure:

Service Discovery

Agentfield maintains a registry of all active agents. When you call "sentiment-agent.analyze_text", the control plane:

  • Looks up where sentiment-agent is running
  • Routes the request to the correct instance
  • Load balances if multiple instances exist
  • Handles health checks and automatic deregistration

No hardcoded URLs. No manual service registry. Teams deploy agents independently, and they discover each other automatically.

Context Propagation

Every execution has context: workflow ID, session ID, parent execution ID. When Agent A calls Agent B, this context flows automatically.

# Agent A
@app.reasoner()
async def process_order(order_id: str):
    # This execution gets workflow_id: wf-123, execution_id: exec-abc

    # Call Agent B
    result = await app.call("inventory.check_stock", order_id=order_id)

    # Agent B's execution automatically gets:
    # - Same workflow_id: wf-123
    # - Parent execution_id: exec-abc
    # - New execution_id: exec-def

This builds a parent-child relationship. You can trace the entire workflow, see which agent called which, and understand the execution flow.

Workflow DAG Construction

Every app.call() creates an edge in the workflow graph. Agentfield automatically builds a directed acyclic graph (DAG) showing:

  • Which agents were involved
  • The order of execution
  • Parent-child relationships
  • Timing for each step
  • Success or failure status

When debugging production issues, you can visualize the entire multi-agent workflow and see exactly where things went wrong.

Distributed Memory Access

Memory is scoped by workflow and session. When Agent A sets a value in memory, Agent B can read it if they're in the same workflow.

# Agent A
await app.memory.set("customer_risk_score", 8.5)

# Agent B (different process, called by Agent A)
risk_score = await app.memory.get("customer_risk_score")  # Gets 8.5

No manual database coordination. No passing data through parameters. Agents share state automatically within appropriate boundaries.

Error Propagation

If a downstream agent fails, the error bubbles up with full context:

@app.reasoner()
async def process_payment(order_id: str):
    try:
        # Call payment processor agent
        result = await app.call("payment-processor.charge", order_id=order_id)
    except Exception as e:
        # You get:
        # - Which agent failed (payment-processor)
        # - Which function failed (charge)
        # - The error message
        # - The full execution context
        # - The workflow DAG up to the failure point

        app.note(f"❌ Payment failed: {e}", tags=["error", "payment"])
        # Handle gracefully

No silent failures. No lost context. You know exactly what happened and where.

Production Patterns

Here are the patterns you'll use in production multi-agent systems:

Sequential Orchestration

When one agent's output feeds into the next:

@app.reasoner()
async def process_loan_application(application: dict) -> dict:
    """
    Sequential workflow: each step depends on the previous.
    Agentfield tracks the entire chain automatically.
    """

    # Step 1: Verify identity
    identity = await app.call(
        "identity-verification.verify_documents",
        documents=application['documents']
    )

    if not identity['verified']:
        return {'status': 'rejected', 'reason': 'identity_verification_failed'}

    # Step 2: Check credit score
    credit = await app.call(
        "credit-bureau.get_score",
        ssn=application['ssn']
    )

    # Step 3: Assess risk using AI
    risk = await app.ai(
        system="You are a loan risk assessor.",
        user=f"Credit score: {credit['score']}, Income: {application['income']}",
        schema=RiskAssessment
    )

    # Step 4: If approved, create account
    if risk.approved:
        account = await app.call(
            "account-management.create_account",
            application_id=application['id'],
            approved_amount=risk.approved_amount
        )

        return {'status': 'approved', 'account_id': account['id']}

    return {'status': 'rejected', 'reason': risk.reasoning}

Agentfield automatically builds the DAG:

process_loan_application
  → verify_documents
  → get_score
  → (AI risk assessment)
  → create_account

Parallel Coordination

When multiple agents can work simultaneously:

import asyncio

@app.reasoner()
async def analyze_contract(contract_text: str) -> dict:
    """
    Parallel workflow: multiple agents analyze simultaneously.
    Faster than sequential, Agentfield coordinates automatically.
    """

    # Launch three analyses in parallel
    results = await asyncio.gather(
        app.call("legal-agent.review_compliance", text=contract_text),
        app.call("financial-agent.extract_terms", text=contract_text),
        app.call("risk-agent.identify_risks", text=contract_text)
    )

    legal, financial, risks = results

    # Synthesize results with AI
    summary = await app.ai(
        system="Combine analysis results into executive summary.",
        user=f"""
        Legal compliance: {legal['compliant']}
        Financial terms: {financial['terms']}
        Identified risks: {risks['risk_count']}
        """,
        schema=ContractSummary
    )

    return {
        'legal': legal,
        'financial': financial,
        'risks': risks,
        'summary': summary.dict()
    }

All three agents run concurrently. Agentfield tracks all three executions as children of the parent workflow.

Event-Driven Coordination

When agents react to state changes:

# Agent A: Monitoring System
@app.reasoner()
async def monitor_system_health(metrics: dict):
    """Monitors system and sets state when anomalies detected."""

    if metrics['error_rate'] > 0.05:
        # Set state in shared memory
        await app.memory.set("system_status", "degraded")
        await app.memory.set("error_details", {
            'error_rate': metrics['error_rate'],
            'timestamp': metrics['timestamp']
        })

# Agent B: Incident Response (different process, different team)
@app.memory.on_change("system_status")
async def handle_system_status_change(event):
    """
    Reacts to system status changes.
    Triggered automatically when memory changes.
    """

    if event.data == "degraded":
        # Get details from memory
        details = await app.memory.get("error_details")

        # Create incident automatically
        incident = await app.call(
            "incident-management.create_incident",
            severity="high",
            details=details
        )

        # Notify on-call team
        await app.call(
            "notification-agent.page_oncall",
            incident_id=incident['id']
        )

        app.note(f"🚨 Incident {incident['id']} created and on-call notified", tags=["incident"])

Agent B doesn't poll. It reacts to memory changes automatically. Agentfield handles the event subscription and delivery.

Calling Patterns

Agentfield supports multiple ways to call agents, depending on your needs:

Same Agent, Different Reasoner

# Direct function call (same process)
from reasoners.sentiment import analyze_text

@app.reasoner()
async def process_feedback(feedback: str):
    # Call another reasoner in the same agent
    sentiment = analyze_text(feedback)  # Direct Python call
    ...

Different Agent, Same Workflow

# Cross-agent call (different process, context propagates)
@app.reasoner()
async def process_order(order_id: str):
    # Call another agent via Agentfield control plane
    inventory = await app.call(
        "inventory-agent.check_stock",
        order_id=order_id
    )
    # Workflow context flows automatically

External HTTP Call

# Any platform can call Agentfield agents via REST
curl -X POST http://af-server/api/v1/execute/support-triage.triage_ticket \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "ticket_id": "T-12345",
      "message": "Platform keeps crashing!",
      "customer_id": "C-789"
    }
  }'

Response:

{
  "execution_id": "exec-abc123",
  "run_id": "wf-def456",
  "status": "completed",
  "result": {
    "ticket_id": "T-12345",
    "decision": {
      "priority": "high",
      "category": "technical",
      "escalate": true
    }
  },
  "duration_ms": 3200,
  "finished_at": "2024-07-08T18:25:05Z"
}

Get the complete workflow DAG:

# Fetch workflow visualization
curl http://af-server/api/v1/workflows/wf-def456/dag
{
  "root_workflow_id": "wf-def456",
  "dag": {
    "workflow_id": "wf-def456",
    "execution_id": "exec-abc123",
    "target": "support-triage.triage_ticket",
    "status": "completed",
    "children": [
      {
        "execution_id": "exec-def789",
        "target": "sentiment-agent.analyze_text",
        "status": "completed"
      },
      {
        "execution_id": "exec-ghi012",
        "target": "customer-history.get_recent_issues",
        "status": "completed"
      },
      {
        "execution_id": "exec-jkl345",
        "target": "escalation-agent.create_case",
        "status": "completed",
        "children": [
          {
            "execution_id": "exec-mno678",
            "target": "notification-agent.send_alert",
            "status": "completed"
          }
        ]
      }
    ]
  }
}

You get the result immediately, then visualize the complete execution graph. Perfect for debugging and monitoring.

Real-Time Monitoring

Watch multi-agent workflows execute in real-time using Server-Sent Events:

# Stream workflow events
curl -N -H "Accept: text/event-stream" \
  http://af-server/api/v1/workflows/wf-def456/events
event: execution.started
data: {"target": "support-triage.triage_ticket", "timestamp": "2024-01-15T10:30:00Z"}

event: execution.started
data: {"target": "sentiment-agent.analyze_text", "parent": "exec-abc123", "timestamp": "2024-01-15T10:30:01Z"}

event: execution.completed
data: {"target": "sentiment-agent.analyze_text", "status": "completed", "duration_ms": 450}

event: execution.started
data: {"target": "escalation-agent.create_case", "parent": "exec-abc123", "timestamp": "2024-01-15T10:30:02Z"}

event: workflow.completed
data: {"workflow_id": "wf-def456", "total_executions": 5, "duration_ms": 3200}

Perfect for building real-time dashboards or monitoring production workflows.

What This Enables

For Developers

Write await app.call() like calling a local function. Agentfield handles service discovery, context propagation, and workflow tracking. Focus on business logic.

For Teams

Each team owns their agents. Deploy independently without coordination. Share state automatically through workflow-scoped memory. No monolithic coupling.

For Production

Complete workflow observability with automatic DAG construction. Distributed tracing across all agents. Cryptographic audit trails for compliance.

For Architecture

Microservices architecture for AI agents. Independent scaling, deployment, and ownership. Automatic coordination without manual service mesh setup.

Next Steps

You now understand how Agentfield enables multi-agent coordination:

Or start building with the Quick Start Guide.