app.call()
Cross-agent communication via Agentfield execution gateway with automatic workflow tracking
app.call()
Cross-agent communication via Agentfield execution gateway with automatic workflow tracking
Execute reasoners and skills on other agent nodes via the Agentfield execution gateway. Automatically propagates execution context, builds workflow DAGs, and handles service discovery without manual configuration.
Basic Example
from agentfield import Agent
app = Agent(node_id="support-router")
@app.reasoner()
async def route_ticket(ticket_id: int, message: str) -> dict:
"""Route support ticket based on sentiment analysis."""
# Call reasoner on another agent
sentiment = await app.call(
"sentiment-analyzer.analyze_text",
text=message
)
# Use the result (always returns dict)
if sentiment.get("is_urgent"):
return {"ticket_id": ticket_id, "action": "escalate"}
else:
return {"ticket_id": ticket_id, "action": "queue"}Return Type Behavior
- Same-Agent Direct Calls: Calling a reasoner/skill directly by function name within the same agent returns the actual Pydantic model (or whatever return type is defined)
- Cross-Agent
app.call(): Usingapp.call()to call any agent (including your own) returnsdictobjects (JSON-like), similar to calling a REST API
Future Enhancement: Automatic Pydantic model conversion for app.call() will be added in a future release. For now, use direct function calls for same-agent type-safe access, or manually convert dict responses from app.call().
Same-Agent: Direct Call vs app.call()
from pydantic import BaseModel
class SentimentResult(BaseModel):
sentiment: str
confidence: float
is_urgent: bool
@app.reasoner()
async def analyze_text(text: str) -> SentimentResult:
"""Returns SentimentResult Pydantic model."""
return await app.ai(
system="Analyze sentiment.",
user=text,
schema=SentimentResult
)
@app.reasoner()
async def process_feedback(feedback: str) -> dict:
"""Demonstrates the difference between direct call and app.call()."""
# ✅ Direct call - gets Pydantic model (same agent)
result: SentimentResult = await analyze_text(feedback)
print(result.sentiment) # Type-safe access
print(result.confidence) # IDE autocomplete works
# ❌ app.call() - gets dict (even for same agent!)
result_dict: dict = await app.call(
"my-agent.analyze_text", # Goes through Agentfield server
text=feedback
)
print(result_dict.get("sentiment")) # Dict access required
print(result_dict.get("confidence")) # No type safety
return {
"direct_call_type": "SentimentResult",
"app_call_type": "dict"
}When to Use Each Approach:
- Direct function calls: Use within the same agent for type-safe, Pydantic model returns
app.call(): Use for cross-agent communication or when you need workflow tracking/DAG building for same-agent calls
Even when calling your own agent's functions, app.call() routes through the Agentfield server and returns dict for consistency across all cross-agent communication.
Cross-Agent Pattern - Dict Returns
from pydantic import BaseModel
class SentimentResult(BaseModel):
sentiment: str
confidence: float
is_urgent: bool
@app.reasoner()
async def analyze_feedback(feedback: str) -> dict:
"""Cross-agent calls always return dict, not Pydantic models."""
# Cross-agent call - returns dict, not SentimentResult instance
result: dict = await app.call(
"sentiment-analyzer.analyze_text", # Different agent
text=feedback
)
# Must access as dict
sentiment = result.get("sentiment")
confidence = result.get("confidence")
return {
"feedback": feedback,
"sentiment": sentiment,
"confidence": confidence
}Parameters
Prop
Type
Returns: dict - JSON-like object containing the execution result. Always returns dict regardless of target function's return type annotation.
Routing Behavior
All Calls Route Through Agentfield Server: Every app.call() request goes through the Agentfield execution gateway, even when calling functions on the same agent node. This provides:
- Unified Interface: Consistent API for all cross-agent communication
- Service Discovery: Automatic routing without hardcoded URLs or IP addresses
- Distributed Execution: Call agents running on different servers, containers, or networks
- Consistent Return Type: Always returns dict for predictable cross-agent contracts
- Network Transparency: Same code works locally and in distributed deployments
This design enables true distributed agent systems where agents can be deployed anywhere and still communicate seamlessly.
How Routing Works
# Example: support-router calling sentiment-analyzer
@app.reasoner()
async def route_ticket(message: str) -> dict:
# 1. app.call() sends request to Agentfield server
# 2. Agentfield server looks up "sentiment-analyzer" in service registry
# 3. Agentfield server forwards request to sentiment-analyzer agent
# 4. sentiment-analyzer executes the function
# 5. Result flows back through Agentfield server
# 6. Workflow DAG is updated with parent-child relationship
sentiment = await app.call(
"sentiment-analyzer.analyze_text",
text=message
)
return {"sentiment": sentiment.get("sentiment")}Common Patterns
Cross-Agent Reasoner Call
Call AI-powered reasoners on other agents for distributed intelligence.
from agentfield import Agent
app = Agent(node_id="customer-service")
@app.reasoner()
async def handle_complaint(customer_id: int, complaint: str) -> dict:
"""Handle customer complaint using multiple specialized agents."""
# Analyze sentiment on specialized agent
sentiment = await app.call(
"sentiment-analyzer.analyze_text",
text=complaint
)
# Assess risk on another specialized agent
risk = await app.call(
"risk-analyzer.assess_customer_risk",
customer_id=customer_id,
sentiment=sentiment.get("sentiment"),
is_urgent=sentiment.get("is_urgent")
)
# Generate response based on analysis
if risk.get("risk_level") == "high":
action = "immediate_escalation"
else:
action = "standard_queue"
return {
"customer_id": customer_id,
"sentiment": sentiment.get("sentiment"),
"risk_level": risk.get("risk_level"),
"action": action
}Cross-Agent Skill Call
Call deterministic skills for business logic and integrations.
@app.reasoner()
async def process_order(order_id: int, customer_email: str) -> dict:
"""Process order using distributed skills."""
# Validate payment via payment agent
payment_result = await app.call(
"payment-agent.validate_payment",
order_id=order_id
)
if not payment_result.get("is_valid"):
return {"status": "payment_failed", "order_id": order_id}
# Update inventory via inventory agent
inventory_result = await app.call(
"inventory-agent.reserve_items",
order_id=order_id
)
# Send confirmation via notification agent
await app.call(
"notification-agent.send_email",
to=customer_email,
subject="Order Confirmed",
body=f"Order {order_id} confirmed"
)
return {
"status": "success",
"order_id": order_id,
"payment": payment_result,
"inventory": inventory_result
}Complex Multi-Agent Orchestration
Build sophisticated workflows spanning multiple agents.
@app.reasoner()
async def process_user_feedback(user_id: int, feedback: str) -> dict:
"""Orchestrate complex multi-agent workflow."""
# Step 1: Analyze sentiment
sentiment = await app.call(
"sentiment-analyzer.analyze_text",
text=feedback
)
# Step 2: Assess risk based on sentiment
risk = await app.call(
"risk-analyzer.assess_user_risk",
user_id=user_id,
sentiment=sentiment.get("sentiment"),
is_urgent=sentiment.get("is_urgent")
)
# Step 3: Generate recommendations
recommendations = await app.call(
"recommendation-engine.generate_recommendations",
user_id=user_id,
sentiment_data=sentiment,
risk_data=risk
)
# Step 4: Take automated actions if needed
actions = []
if risk.get("risk_level") == "high":
# Notify support team
alert = await app.call(
"notification-agent.send_alert",
type="high_risk_user",
user_id=user_id,
details=risk
)
actions.append(f"Alert sent: {alert.get('alert_id')}")
# Create priority ticket
ticket = await app.call(
"ticketing-agent.create_priority_ticket",
user_id=user_id,
feedback=feedback,
priority="high"
)
actions.append(f"Ticket created: {ticket.get('ticket_id')}")
return {
"user_id": user_id,
"sentiment_score": sentiment.get("confidence"),
"risk_level": risk.get("risk_level"),
"recommendations": recommendations.get("suggestions", []),
"actions_taken": actions
}Parallel Execution: Use asyncio.gather() for independent cross-agent
calls to reduce total execution time. Each call still goes through the
Agentfield server but executes concurrently. See example in accordion below.
Error Handling and Fallbacks
Build resilient workflows with proper error handling.
@app.reasoner()
async def resilient_workflow(user_id: int) -> dict:
"""Handle failures gracefully in multi-agent workflows."""
results = {
"user_id": user_id,
"steps_completed": [],
"errors": []
}
try:
# Critical step - must succeed
user_data = await app.call(
"user-service.get_user_data",
user_id=user_id
)
results["user_data"] = user_data
results["steps_completed"].append("user_data_retrieved")
except Exception as e:
results["errors"].append(f"Failed to get user data: {str(e)}")
results["status"] = "failed"
return results
# Optional enhancement - can fail gracefully
try:
enrichment = await app.call(
"enrichment-service.enrich_user_data",
user_data=user_data
)
results["enriched_data"] = enrichment
results["steps_completed"].append("data_enriched")
except Exception as e:
results["errors"].append(f"Enrichment failed: {str(e)}")
# Continue with basic data
# Final processing with available data
try:
final_result = await app.call(
"processor.finalize_processing",
user_data=user_data,
enriched_data=results.get("enriched_data")
)
results["final_result"] = final_result
results["steps_completed"].append("processing_completed")
results["status"] = "success"
except Exception as e:
results["errors"].append(f"Processing failed: {str(e)}")
results["status"] = "partial_success"
return resultsAsync Execution for Long-Running Workflows
For long-running multi-agent workflows, use async execution with polling.
@app.reasoner()
async def trigger_long_workflow(data: dict) -> dict:
"""Trigger long-running workflow asynchronously."""
# For very long workflows, use async execution
# The Agentfield server handles polling and result retrieval
result = await app.call(
"data-processor.process_large_dataset",
dataset=data,
processing_options={
"batch_size": 1000,
"parallel_workers": 10
}
)
# Result is automatically polled and returned
# See async execution docs for webhook patterns
return resultAsync Execution: Agentfield automatically handles async execution for long-running workflows. The SDK polls for results and returns them when complete. For advanced patterns including webhooks and custom polling, see Async Execution.
Type-Safe Pattern with Manual Conversion
Current workaround for type safety until automatic Pydantic conversion is available.
from pydantic import BaseModel
from typing import List
class UserProfile(BaseModel):
user_id: int
name: str
email: str
preferences: dict
class RecommendationList(BaseModel):
recommendations: List[str]
confidence: float
@app.reasoner()
async def get_personalized_recommendations(user_id: int) -> RecommendationList:
"""Type-safe cross-agent calls with manual conversion."""
# Get user profile (returns dict)
profile_dict = await app.call(
"user-service.get_profile",
user_id=user_id
)
# Convert to Pydantic for type safety
profile = UserProfile(**profile_dict)
# Get recommendations (returns dict)
recs_dict = await app.call(
"recommendation-engine.generate_recommendations",
user_id=profile.user_id,
preferences=profile.preferences
)
# Convert to Pydantic for type safety
recommendations = RecommendationList(**recs_dict)
# Type-safe access throughout
if recommendations.confidence > 0.8:
# High confidence recommendations
return recommendations
else:
# Fallback to default recommendations
return RecommendationList(
recommendations=["default_item_1", "default_item_2"],
confidence=0.5
)Advanced Features
Zero-Latency Identity Generation
Zero-Latency Identity: Verifiable Credentials (VCs) and Decentralized Identifiers (DIDs) are generated asynchronously in the background and do not block execution. Your workflows run at full speed while identity credentials are created for audit trails and compliance.
@app.reasoner()
async def compliant_workflow(data: dict) -> dict:
"""Workflow with automatic identity generation."""
# Execute cross-agent call
result = await app.call(
"processor.process_data",
data=data
)
# VC/DID generation happens asynchronously
# No performance impact on your workflow
# Credentials available for audit via Agentfield UI
return resultTimeout Configuration
Configure execution timeouts for different workflow patterns.
from agentfield import Agent, AsyncConfig
# Configure timeouts at agent level
app = Agent(
node_id="orchestrator",
async_config=AsyncConfig(
default_execution_timeout=300.0, # 5 minutes default
max_execution_timeout=600.0, # 10 minutes max
enable_async_execution=True,
fallback_to_sync=True
)
)
@app.reasoner()
async def long_running_workflow(data: dict) -> dict:
"""Workflow with appropriate timeout handling."""
try:
# Long-running cross-agent call
# Uses configured timeout (300s default, 600s max)
result = await app.call(
"data-processor.process_large_dataset",
dataset=data
)
return result
except asyncio.TimeoutError:
# Handle timeout gracefully
return {
"status": "timeout",
"message": "Processing exceeded timeout limit"
}Fallback Strategies
Implement fallback logic for resilient multi-agent systems.
@app.reasoner()
async def resilient_analysis(text: str) -> dict:
"""Use fallback agents if primary fails."""
# Try primary sentiment analyzer
try:
sentiment = await app.call(
"sentiment-analyzer-primary.analyze_text",
text=text
)
return {
"sentiment": sentiment.get("sentiment"),
"source": "primary"
}
except Exception as e:
# Fallback to secondary analyzer
try:
sentiment = await app.call(
"sentiment-analyzer-backup.analyze_text",
text=text
)
return {
"sentiment": sentiment.get("sentiment"),
"source": "backup"
}
except Exception as e2:
# Final fallback to simple rule-based analysis
return {
"sentiment": "neutral",
"source": "fallback",
"error": str(e2)
}Best Practices
Design for Observability
Every cross-agent call creates a traceable workflow edge in the DAG.
@app.reasoner()
async def observable_workflow(order_id: int) -> dict:
"""Design workflows with clear, observable steps."""
# Each call creates a node in the workflow DAG
# Visible in Agentfield UI for debugging and monitoring
# Step 1: Validate
validation = await app.call(
"validator.validate_order",
order_id=order_id
)
# Step 2: Process
if validation.get("is_valid"):
processing = await app.call(
"processor.process_order",
order_id=order_id
)
# Step 3: Notify
notification = await app.call(
"notifier.send_confirmation",
order_id=order_id,
result=processing
)
return {
"status": "success",
"order_id": order_id,
"processing": processing,
"notification": notification
}
else:
return {
"status": "validation_failed",
"errors": validation.get("errors", [])
}Handle Network Failures
Cross-agent calls involve network communication - handle failures appropriately.
import asyncio
@app.reasoner()
async def network_resilient_workflow(data: dict) -> dict:
"""Handle network failures in cross-agent calls."""
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
result = await app.call(
"external-service.process_data",
data=data
)
return result
except Exception as e:
if attempt < max_retries - 1:
# Exponential backoff
await asyncio.sleep(retry_delay * (2 ** attempt))
continue
else:
# All retries failed
return {
"status": "failed",
"error": str(e),
"attempts": max_retries
}Optimize for Performance
Use parallel execution and appropriate timeouts.
import asyncio
@app.reasoner()
async def optimized_workflow(items: list[dict]) -> dict:
"""Optimize multi-agent workflows for performance."""
# Process items in parallel batches
batch_size = 10
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Process batch in parallel
batch_tasks = [
app.call("processor.process_item", item=item)
for item in batch
]
batch_results = await asyncio.gather(
*batch_tasks,
return_exceptions=True
)
results.extend(batch_results)
# Aggregate results
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
return {
"total_items": len(items),
"successful": len(successful),
"failed": len(failed),
"results": successful
}Use Appropriate Error Messages
Provide clear error context for debugging distributed workflows.
@app.reasoner()
async def well_documented_workflow(user_id: int) -> dict:
"""Provide clear error messages for debugging."""
try:
user_data = await app.call(
"user-service.get_user",
user_id=user_id
)
except Exception as e:
# Clear error message with context
raise Exception(
f"Failed to retrieve user data for user_id={user_id}. "
f"Error from user-service: {str(e)}"
)
try:
analysis = await app.call(
"analyzer.analyze_user",
user_data=user_data
)
except Exception as e:
# Include relevant context in error
raise Exception(
f"Failed to analyze user {user_id}. "
f"User data: {user_data.get('name', 'unknown')}. "
f"Error from analyzer: {str(e)}"
)
return analysisPerformance Considerations
Network Latency:
- Cross-agent calls involve network round-trips through Agentfield server
- Typical overhead: 10-50ms per call depending on network conditions
- Use parallel execution for independent calls to minimize total time
Serialization:
- Arguments are automatically serialized to JSON for network transmission
- Complex objects (Pydantic models, custom classes) are converted automatically
- Minimal overhead for typical data structures (< 5ms)
Workflow Tracking:
- Every call creates workflow DAG entries for observability
- Adds ~5-10ms overhead for tracking and context propagation
- Essential for debugging and monitoring distributed systems
Async Execution:
- Long-running workflows (> 60s) automatically use async execution
- Polling mechanism adds minimal overhead
- See Async Execution for optimization strategies
Identity Generation:
- VC/DID generation happens asynchronously in background
- Zero impact on execution performance
- Credentials available for audit after execution completes
Related
- Async Execution - Detailed async patterns and webhooks
- Execution Context - Context propagation and tracking
- @app.reasoner - Define callable AI-powered reasoners
- @app.skill - Define callable deterministic skills
- Agent Node - Initialize agents with AsyncConfig
- Workflow DAG - Visualize multi-agent workflows