app.call()

Cross-agent communication via Agentfield execution gateway with automatic workflow tracking

app.call()

Cross-agent communication via Agentfield execution gateway with automatic workflow tracking

Execute reasoners and skills on other agent nodes via the Agentfield execution gateway. Automatically propagates execution context, builds workflow DAGs, and handles service discovery without manual configuration.

Basic Example

from agentfield import Agent

app = Agent(node_id="support-router")

@app.reasoner()
async def route_ticket(ticket_id: int, message: str) -> dict:
    """Route support ticket based on sentiment analysis."""

    # Call reasoner on another agent
    sentiment = await app.call(
        "sentiment-analyzer.analyze_text",
        text=message
    )

    # Use the result (always returns dict)
    if sentiment.get("is_urgent"):
        return {"ticket_id": ticket_id, "action": "escalate"}
    else:
        return {"ticket_id": ticket_id, "action": "queue"}

Return Type Behavior

  • Same-Agent Direct Calls: Calling a reasoner/skill directly by function name within the same agent returns the actual Pydantic model (or whatever return type is defined)
  • Cross-Agent app.call(): Using app.call() to call any agent (including your own) returns dict objects (JSON-like), similar to calling a REST API

Future Enhancement: Automatic Pydantic model conversion for app.call() will be added in a future release. For now, use direct function calls for same-agent type-safe access, or manually convert dict responses from app.call().

Same-Agent: Direct Call vs app.call()

from pydantic import BaseModel

class SentimentResult(BaseModel):
    sentiment: str
    confidence: float
    is_urgent: bool

@app.reasoner()
async def analyze_text(text: str) -> SentimentResult:
    """Returns SentimentResult Pydantic model."""
    return await app.ai(
        system="Analyze sentiment.",
        user=text,
        schema=SentimentResult
    )

@app.reasoner()
async def process_feedback(feedback: str) -> dict:
    """Demonstrates the difference between direct call and app.call()."""

    # ✅ Direct call - gets Pydantic model (same agent)
    result: SentimentResult = await analyze_text(feedback)
    print(result.sentiment)  # Type-safe access
    print(result.confidence)  # IDE autocomplete works

    # ❌ app.call() - gets dict (even for same agent!)
    result_dict: dict = await app.call(
        "my-agent.analyze_text",  # Goes through Agentfield server
        text=feedback
    )
    print(result_dict.get("sentiment"))  # Dict access required
    print(result_dict.get("confidence"))  # No type safety

    return {
        "direct_call_type": "SentimentResult",
        "app_call_type": "dict"
    }

When to Use Each Approach:

  • Direct function calls: Use within the same agent for type-safe, Pydantic model returns
  • app.call(): Use for cross-agent communication or when you need workflow tracking/DAG building for same-agent calls

Even when calling your own agent's functions, app.call() routes through the Agentfield server and returns dict for consistency across all cross-agent communication.

Cross-Agent Pattern - Dict Returns

from pydantic import BaseModel

class SentimentResult(BaseModel):
    sentiment: str
    confidence: float
    is_urgent: bool

@app.reasoner()
async def analyze_feedback(feedback: str) -> dict:
    """Cross-agent calls always return dict, not Pydantic models."""

    # Cross-agent call - returns dict, not SentimentResult instance
    result: dict = await app.call(
        "sentiment-analyzer.analyze_text",  # Different agent
        text=feedback
    )

    # Must access as dict
    sentiment = result.get("sentiment")
    confidence = result.get("confidence")

    return {
        "feedback": feedback,
        "sentiment": sentiment,
        "confidence": confidence
    }

Parameters

Prop

Type

Returns: dict - JSON-like object containing the execution result. Always returns dict regardless of target function's return type annotation.

Routing Behavior

All Calls Route Through Agentfield Server: Every app.call() request goes through the Agentfield execution gateway, even when calling functions on the same agent node. This provides:

  • Unified Interface: Consistent API for all cross-agent communication
  • Service Discovery: Automatic routing without hardcoded URLs or IP addresses
  • Distributed Execution: Call agents running on different servers, containers, or networks
  • Consistent Return Type: Always returns dict for predictable cross-agent contracts
  • Network Transparency: Same code works locally and in distributed deployments

This design enables true distributed agent systems where agents can be deployed anywhere and still communicate seamlessly.

How Routing Works

# Example: support-router calling sentiment-analyzer

@app.reasoner()
async def route_ticket(message: str) -> dict:
    # 1. app.call() sends request to Agentfield server
    # 2. Agentfield server looks up "sentiment-analyzer" in service registry
    # 3. Agentfield server forwards request to sentiment-analyzer agent
    # 4. sentiment-analyzer executes the function
    # 5. Result flows back through Agentfield server
    # 6. Workflow DAG is updated with parent-child relationship

    sentiment = await app.call(
        "sentiment-analyzer.analyze_text",
        text=message
    )

    return {"sentiment": sentiment.get("sentiment")}

Common Patterns

Cross-Agent Reasoner Call

Call AI-powered reasoners on other agents for distributed intelligence.

from agentfield import Agent

app = Agent(node_id="customer-service")

@app.reasoner()
async def handle_complaint(customer_id: int, complaint: str) -> dict:
    """Handle customer complaint using multiple specialized agents."""

    # Analyze sentiment on specialized agent
    sentiment = await app.call(
        "sentiment-analyzer.analyze_text",
        text=complaint
    )

    # Assess risk on another specialized agent
    risk = await app.call(
        "risk-analyzer.assess_customer_risk",
        customer_id=customer_id,
        sentiment=sentiment.get("sentiment"),
        is_urgent=sentiment.get("is_urgent")
    )

    # Generate response based on analysis
    if risk.get("risk_level") == "high":
        action = "immediate_escalation"
    else:
        action = "standard_queue"

    return {
        "customer_id": customer_id,
        "sentiment": sentiment.get("sentiment"),
        "risk_level": risk.get("risk_level"),
        "action": action
    }

Cross-Agent Skill Call

Call deterministic skills for business logic and integrations.

@app.reasoner()
async def process_order(order_id: int, customer_email: str) -> dict:
    """Process order using distributed skills."""

    # Validate payment via payment agent
    payment_result = await app.call(
        "payment-agent.validate_payment",
        order_id=order_id
    )

    if not payment_result.get("is_valid"):
        return {"status": "payment_failed", "order_id": order_id}

    # Update inventory via inventory agent
    inventory_result = await app.call(
        "inventory-agent.reserve_items",
        order_id=order_id
    )

    # Send confirmation via notification agent
    await app.call(
        "notification-agent.send_email",
        to=customer_email,
        subject="Order Confirmed",
        body=f"Order {order_id} confirmed"
    )

    return {
        "status": "success",
        "order_id": order_id,
        "payment": payment_result,
        "inventory": inventory_result
    }

Complex Multi-Agent Orchestration

Build sophisticated workflows spanning multiple agents.

@app.reasoner()
async def process_user_feedback(user_id: int, feedback: str) -> dict:
    """Orchestrate complex multi-agent workflow."""

    # Step 1: Analyze sentiment
    sentiment = await app.call(
        "sentiment-analyzer.analyze_text",
        text=feedback
    )

    # Step 2: Assess risk based on sentiment
    risk = await app.call(
        "risk-analyzer.assess_user_risk",
        user_id=user_id,
        sentiment=sentiment.get("sentiment"),
        is_urgent=sentiment.get("is_urgent")
    )

    # Step 3: Generate recommendations
    recommendations = await app.call(
        "recommendation-engine.generate_recommendations",
        user_id=user_id,
        sentiment_data=sentiment,
        risk_data=risk
    )

    # Step 4: Take automated actions if needed
    actions = []
    if risk.get("risk_level") == "high":
        # Notify support team
        alert = await app.call(
            "notification-agent.send_alert",
            type="high_risk_user",
            user_id=user_id,
            details=risk
        )
        actions.append(f"Alert sent: {alert.get('alert_id')}")

        # Create priority ticket
        ticket = await app.call(
            "ticketing-agent.create_priority_ticket",
            user_id=user_id,
            feedback=feedback,
            priority="high"
        )
        actions.append(f"Ticket created: {ticket.get('ticket_id')}")

    return {
        "user_id": user_id,
        "sentiment_score": sentiment.get("confidence"),
        "risk_level": risk.get("risk_level"),
        "recommendations": recommendations.get("suggestions", []),
        "actions_taken": actions
    }

Parallel Execution: Use asyncio.gather() for independent cross-agent calls to reduce total execution time. Each call still goes through the Agentfield server but executes concurrently. See example in accordion below.

Error Handling and Fallbacks

Build resilient workflows with proper error handling.

@app.reasoner()
async def resilient_workflow(user_id: int) -> dict:
    """Handle failures gracefully in multi-agent workflows."""

    results = {
        "user_id": user_id,
        "steps_completed": [],
        "errors": []
    }

    try:
        # Critical step - must succeed
        user_data = await app.call(
            "user-service.get_user_data",
            user_id=user_id
        )
        results["user_data"] = user_data
        results["steps_completed"].append("user_data_retrieved")

    except Exception as e:
        results["errors"].append(f"Failed to get user data: {str(e)}")
        results["status"] = "failed"
        return results

    # Optional enhancement - can fail gracefully
    try:
        enrichment = await app.call(
            "enrichment-service.enrich_user_data",
            user_data=user_data
        )
        results["enriched_data"] = enrichment
        results["steps_completed"].append("data_enriched")
    except Exception as e:
        results["errors"].append(f"Enrichment failed: {str(e)}")
        # Continue with basic data

    # Final processing with available data
    try:
        final_result = await app.call(
            "processor.finalize_processing",
            user_data=user_data,
            enriched_data=results.get("enriched_data")
        )
        results["final_result"] = final_result
        results["steps_completed"].append("processing_completed")
        results["status"] = "success"
    except Exception as e:
        results["errors"].append(f"Processing failed: {str(e)}")
        results["status"] = "partial_success"

    return results

Async Execution for Long-Running Workflows

For long-running multi-agent workflows, use async execution with polling.

@app.reasoner()
async def trigger_long_workflow(data: dict) -> dict:
    """Trigger long-running workflow asynchronously."""

    # For very long workflows, use async execution
    # The Agentfield server handles polling and result retrieval
    result = await app.call(
        "data-processor.process_large_dataset",
        dataset=data,
        processing_options={
            "batch_size": 1000,
            "parallel_workers": 10
        }
    )

    # Result is automatically polled and returned
    # See async execution docs for webhook patterns
    return result

Async Execution: Agentfield automatically handles async execution for long-running workflows. The SDK polls for results and returns them when complete. For advanced patterns including webhooks and custom polling, see Async Execution.

Type-Safe Pattern with Manual Conversion

Current workaround for type safety until automatic Pydantic conversion is available.

from pydantic import BaseModel
from typing import List

class UserProfile(BaseModel):
    user_id: int
    name: str
    email: str
    preferences: dict

class RecommendationList(BaseModel):
    recommendations: List[str]
    confidence: float

@app.reasoner()
async def get_personalized_recommendations(user_id: int) -> RecommendationList:
    """Type-safe cross-agent calls with manual conversion."""

    # Get user profile (returns dict)
    profile_dict = await app.call(
        "user-service.get_profile",
        user_id=user_id
    )

    # Convert to Pydantic for type safety
    profile = UserProfile(**profile_dict)

    # Get recommendations (returns dict)
    recs_dict = await app.call(
        "recommendation-engine.generate_recommendations",
        user_id=profile.user_id,
        preferences=profile.preferences
    )

    # Convert to Pydantic for type safety
    recommendations = RecommendationList(**recs_dict)

    # Type-safe access throughout
    if recommendations.confidence > 0.8:
        # High confidence recommendations
        return recommendations
    else:
        # Fallback to default recommendations
        return RecommendationList(
            recommendations=["default_item_1", "default_item_2"],
            confidence=0.5
        )

Advanced Features

Zero-Latency Identity Generation

Zero-Latency Identity: Verifiable Credentials (VCs) and Decentralized Identifiers (DIDs) are generated asynchronously in the background and do not block execution. Your workflows run at full speed while identity credentials are created for audit trails and compliance.

@app.reasoner()
async def compliant_workflow(data: dict) -> dict:
    """Workflow with automatic identity generation."""

    # Execute cross-agent call
    result = await app.call(
        "processor.process_data",
        data=data
    )

    # VC/DID generation happens asynchronously
    # No performance impact on your workflow
    # Credentials available for audit via Agentfield UI

    return result

Timeout Configuration

Configure execution timeouts for different workflow patterns.

from agentfield import Agent, AsyncConfig

# Configure timeouts at agent level
app = Agent(
    node_id="orchestrator",
    async_config=AsyncConfig(
        default_execution_timeout=300.0,  # 5 minutes default
        max_execution_timeout=600.0,      # 10 minutes max
        enable_async_execution=True,
        fallback_to_sync=True
    )
)

@app.reasoner()
async def long_running_workflow(data: dict) -> dict:
    """Workflow with appropriate timeout handling."""

    try:
        # Long-running cross-agent call
        # Uses configured timeout (300s default, 600s max)
        result = await app.call(
            "data-processor.process_large_dataset",
            dataset=data
        )
        return result

    except asyncio.TimeoutError:
        # Handle timeout gracefully
        return {
            "status": "timeout",
            "message": "Processing exceeded timeout limit"
        }

Fallback Strategies

Implement fallback logic for resilient multi-agent systems.

@app.reasoner()
async def resilient_analysis(text: str) -> dict:
    """Use fallback agents if primary fails."""

    # Try primary sentiment analyzer
    try:
        sentiment = await app.call(
            "sentiment-analyzer-primary.analyze_text",
            text=text
        )
        return {
            "sentiment": sentiment.get("sentiment"),
            "source": "primary"
        }
    except Exception as e:
        # Fallback to secondary analyzer
        try:
            sentiment = await app.call(
                "sentiment-analyzer-backup.analyze_text",
                text=text
            )
            return {
                "sentiment": sentiment.get("sentiment"),
                "source": "backup"
            }
        except Exception as e2:
            # Final fallback to simple rule-based analysis
            return {
                "sentiment": "neutral",
                "source": "fallback",
                "error": str(e2)
            }

Best Practices

Design for Observability

Every cross-agent call creates a traceable workflow edge in the DAG.

@app.reasoner()
async def observable_workflow(order_id: int) -> dict:
    """Design workflows with clear, observable steps."""

    # Each call creates a node in the workflow DAG
    # Visible in Agentfield UI for debugging and monitoring

    # Step 1: Validate
    validation = await app.call(
        "validator.validate_order",
        order_id=order_id
    )

    # Step 2: Process
    if validation.get("is_valid"):
        processing = await app.call(
            "processor.process_order",
            order_id=order_id
        )

        # Step 3: Notify
        notification = await app.call(
            "notifier.send_confirmation",
            order_id=order_id,
            result=processing
        )

        return {
            "status": "success",
            "order_id": order_id,
            "processing": processing,
            "notification": notification
        }
    else:
        return {
            "status": "validation_failed",
            "errors": validation.get("errors", [])
        }

Handle Network Failures

Cross-agent calls involve network communication - handle failures appropriately.

import asyncio

@app.reasoner()
async def network_resilient_workflow(data: dict) -> dict:
    """Handle network failures in cross-agent calls."""

    max_retries = 3
    retry_delay = 1.0

    for attempt in range(max_retries):
        try:
            result = await app.call(
                "external-service.process_data",
                data=data
            )
            return result

        except Exception as e:
            if attempt < max_retries - 1:
                # Exponential backoff
                await asyncio.sleep(retry_delay * (2 ** attempt))
                continue
            else:
                # All retries failed
                return {
                    "status": "failed",
                    "error": str(e),
                    "attempts": max_retries
                }

Optimize for Performance

Use parallel execution and appropriate timeouts.

import asyncio

@app.reasoner()
async def optimized_workflow(items: list[dict]) -> dict:
    """Optimize multi-agent workflows for performance."""

    # Process items in parallel batches
    batch_size = 10
    results = []

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]

        # Process batch in parallel
        batch_tasks = [
            app.call("processor.process_item", item=item)
            for item in batch
        ]

        batch_results = await asyncio.gather(
            *batch_tasks,
            return_exceptions=True
        )

        results.extend(batch_results)

    # Aggregate results
    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]

    return {
        "total_items": len(items),
        "successful": len(successful),
        "failed": len(failed),
        "results": successful
    }

Use Appropriate Error Messages

Provide clear error context for debugging distributed workflows.

@app.reasoner()
async def well_documented_workflow(user_id: int) -> dict:
    """Provide clear error messages for debugging."""

    try:
        user_data = await app.call(
            "user-service.get_user",
            user_id=user_id
        )
    except Exception as e:
        # Clear error message with context
        raise Exception(
            f"Failed to retrieve user data for user_id={user_id}. "
            f"Error from user-service: {str(e)}"
        )

    try:
        analysis = await app.call(
            "analyzer.analyze_user",
            user_data=user_data
        )
    except Exception as e:
        # Include relevant context in error
        raise Exception(
            f"Failed to analyze user {user_id}. "
            f"User data: {user_data.get('name', 'unknown')}. "
            f"Error from analyzer: {str(e)}"
        )

    return analysis

Performance Considerations

Network Latency:

  • Cross-agent calls involve network round-trips through Agentfield server
  • Typical overhead: 10-50ms per call depending on network conditions
  • Use parallel execution for independent calls to minimize total time

Serialization:

  • Arguments are automatically serialized to JSON for network transmission
  • Complex objects (Pydantic models, custom classes) are converted automatically
  • Minimal overhead for typical data structures (< 5ms)

Workflow Tracking:

  • Every call creates workflow DAG entries for observability
  • Adds ~5-10ms overhead for tracking and context propagation
  • Essential for debugging and monitoring distributed systems

Async Execution:

  • Long-running workflows (> 60s) automatically use async execution
  • Polling mechanism adds minimal overhead
  • See Async Execution for optimization strategies

Identity Generation:

  • VC/DID generation happens asynchronously in background
  • Zero impact on execution performance
  • Credentials available for audit after execution completes