@app.reasoner
Define AI-powered functions with automatic workflow tracking
@app.reasoner
Define AI-powered functions with automatic workflow tracking
Decorator that transforms Python functions into AI-powered reasoners with automatic REST API endpoints, workflow tracking, and execution context management.
Basic Example
from agentfield import Agent
app = Agent(node_id="support-agent")
@app.reasoner
async def analyze_ticket(ticket_text: str) -> str:
"""Analyze support ticket and suggest resolution."""
return await app.ai(
system="You are a support ticket analyzer.",
user=f"Analyze this ticket: {ticket_text}"
)curl -X POST http://localhost:8080/api/v1/execute/support-agent.analyze_ticket \
-H "Content-Type: application/json" \
-d '{
"input": {
"ticket_text": "Login button not working on mobile app"
}
}'{
"execution_id": "exec-abc123",
"workflow_id": "wf-def456",
"status": "completed",
"result": "This appears to be a mobile-specific UI issue...",
"duration_ms": 1250
}{
"nodes": [
{
"id": "support-agent.analyze_ticket",
"type": "reasoner",
"status": "completed"
}
],
"edges": []
}Decorator Parameters
Prop
Type
Workflow tracking is always enabled for @app.reasoner in the Python SDK.
There is no flag to disable it.
Common Patterns
Structured Output with Pydantic
Return type-safe, validated data using Pydantic models.
from pydantic import BaseModel
class TicketAnalysis(BaseModel):
category: str
priority: str
estimated_resolution_time: int
suggested_actions: list[str]
@app.reasoner
async def analyze_ticket_structured(ticket_text: str) -> TicketAnalysis:
"""Analyze ticket with structured output."""
return await app.ai(
system="Analyze support tickets systematically.",
user=ticket_text,
schema=TicketAnalysis # Enforces structured output
)The schema parameter in app.ai() automatically instructs the LLM to format
responses according to your Pydantic model. The SDK validates and returns a
typed object.
Custom Paths and Tags
Organize reasoners with custom endpoints and metadata.
@app.reasoner(
path="/tickets/analyze/v2",
tags=["support", "ai", "v2"],
description="Advanced ticket analysis with ML"
)
async def analyze_ticket_v2(ticket_text: str, context: dict) -> dict:
"""Enhanced analysis with additional context."""
return await app.ai(
user=f"Ticket: {ticket_text}\nContext: {context}"
)API endpoint becomes: /api/v1/execute/support-agent/tickets/analyze/v2
Workflow Tracking Behavior
Workflow tracking, DAG building, and execution context propagation are always on for reasoners. There is no decorator flag to turn this off.
Controlling Verifiable Credential Generation
Override the agent-level VC policy for specific reasoners.
# Force VC generation even if agent has it disabled
@app.reasoner(vc_enabled=True)
async def audit_critical_decision(data: dict) -> dict:
"""Critical decision that must have verifiable credentials."""
return await app.ai(
system="Make compliance-critical decision",
user=str(data)
)
# Disable VC for high-frequency, low-risk operations
@app.reasoner(vc_enabled=False)
async def quick_categorization(text: str) -> str:
"""Fast categorization without VC overhead."""
return await app.ai(
system="Categorize: tech, business, or other",
user=text
)
# Inherit from agent-level setting (default)
@app.reasoner(vc_enabled=None)
async def standard_analysis(data: dict) -> dict:
"""Uses agent's default VC policy."""
return await app.ai(user=str(data))VC generation hierarchy: reasoner decorator → agent node → platform default (enabled).
Use vc_enabled=True for compliance-critical operations, False for high-frequency tasks, and None (default) to inherit the agent's policy.
Execution Context Access
Access workflow metadata and execution details.
from agentfield.execution_context import ExecutionContext
@app.reasoner
async def context_aware_analysis(
ticket_text: str,
execution_context: ExecutionContext = None
) -> dict:
"""Analysis with execution context awareness."""
# Access workflow metadata
workflow_id = execution_context.workflow_id if execution_context else None
parent_id = execution_context.parent_execution_id if execution_context else None
analysis = await app.ai(
user=f"Analyze: {ticket_text}"
)
return {
"analysis": analysis,
"workflow_id": workflow_id,
"parent_execution_id": parent_id
}The execution_context parameter is automatically injected when the reasoner
is called via the Agentfield server. It's None for direct Python function
calls.
Router-Based Organization
Group related reasoners with AgentRouter for better namespace management.
from agentfield.router import AgentRouter
app = Agent(node_id="support-agent")
tickets = AgentRouter(prefix="Tickets/Analysis")
@tickets.reasoner(tags=["priority"])
async def classify_priority(ticket_text: str) -> str:
"""Classify ticket priority."""
return await app.ai(
system="Classify as: low, medium, high, critical",
user=ticket_text
)
@tickets.reasoner(tags=["routing"])
async def route_to_team(ticket_text: str, priority: str) -> str:
"""Route ticket to appropriate team."""
return await app.ai(
user=f"Route this {priority} priority ticket: {ticket_text}"
)
app.include_router(tickets)API endpoints:
/api/v1/execute/support-agent.tickets_analysis_classify_priority/api/v1/execute/support-agent.tickets_analysis_route_to_team
Cross-Agent Reasoner Calls
Call reasoners from other agents while maintaining workflow context.
@app.reasoner
async def comprehensive_analysis(ticket_text: str) -> dict:
"""Multi-agent ticket analysis."""
# Call sentiment analyzer on different agent
sentiment = await app.call(
"sentiment-agent.analyze_sentiment",
text=ticket_text
)
# Call priority classifier on same agent
priority = await app.call(
"support-agent.classify_priority",
ticket_text=ticket_text
)
# Combine results with AI
final_analysis = await app.ai(
user=f"Combine: sentiment={sentiment}, priority={priority}",
schema=TicketAnalysis
)
return final_analysisEach app.call() creates a child execution context, building a complete
workflow DAG that shows the entire multi-agent orchestration.
Auto-Generated Features
Every @app.reasoner() decorated function automatically gets:
REST API Endpoint: Accessible at /api/v1/execute/{agent_node_id}.{reasoner_name}
Workflow Tracking: Creates execution contexts, builds DAGs, tracks parent-child relationships
Pydantic Validation: Automatic conversion and validation of function arguments (FastAPI-like behavior)
DID-Based Identity: Each execution gets a unique decentralized identifier for audit trails
Execution Metadata: Automatic tracking of duration, timestamps, status, and results
API Calling Patterns
Synchronous Execution
For real-time responses (< 30 seconds).
curl -X POST http://localhost:8080/api/v1/execute/support-agent.analyze_ticket \
-H "Content-Type: application/json" \
-d '{
"input": {
"ticket_text": "Cannot reset password"
}
}'Asynchronous Execution
For long-running reasoners with webhook callbacks.
curl -X POST http://localhost:8080/api/v1/execute/async/support-agent.deep_analysis \
-H "Content-Type: application/json" \
-d '{
"input": {
"ticket_text": "Complex multi-part issue..."
},
"webhook": {
"url": "https://your-app.com/agentfield/callback",
"secret": "your-webhook-secret"
}
}'Response:
{
"execution_id": "exec-abc123",
"workflow_id": "wf-def456",
"status": "queued",
"webhook_registered": true
}Server-Sent Events
Real-time workflow updates via SSE.
curl -N -H "Accept: text/event-stream" \
http://localhost:8080/api/v1/workflows/wf-def456/eventsEvent stream:
event: workflow.started
data: {"workflow_id": "wf-def456", "status": "running"}
event: execution.completed
data: {"execution_id": "exec-abc123", "status": "completed", "result": {...}}Automatic Pydantic Conversion
Reasoners automatically convert function arguments to Pydantic models when type hints are provided.
from pydantic import BaseModel, Field
class TicketInput(BaseModel):
text: str = Field(..., min_length=10)
category: str = Field(..., pattern="^(bug|feature|support)$")
priority: int = Field(default=3, ge=1, le=5)
@app.reasoner
async def validate_and_analyze(ticket: TicketInput) -> dict:
"""Automatic validation via Pydantic."""
# ticket is already validated TicketInput instance
return await app.ai(
user=f"Analyze {ticket.category} ticket: {ticket.text}"
)API call with validation:
curl -X POST http://localhost:8080/api/v1/execute/support-agent.validate_and_analyze \
-H "Content-Type: application/json" \
-d '{
"input": {
"ticket": {
"text": "Login button broken",
"category": "bug",
"priority": 4
}
}
}'Invalid input returns validation error:
{
"error": "Pydantic validation failed",
"details": {
"category": ["Input should be 'bug', 'feature', or 'support'"]
}
}Memory Integration
Access shared memory within reasoners for context persistence.
@app.reasoner
async def analyze_with_history(ticket_text: str, user_id: str) -> dict:
"""Analyze ticket with user history context."""
# Get user's previous tickets from memory
history = await app.memory.get(f"user_{user_id}_history", default=[])
# Analyze with context
analysis = await app.ai(
system="Consider user's ticket history",
user=f"Current: {ticket_text}\nHistory: {history}"
)
# Update history
history.append({"ticket": ticket_text, "analysis": analysis})
await app.memory.set(f"user_{user_id}_history", history)
return analysisMemory is automatically scoped to workflow/session/actor/global contexts. See app.memory for details.
Error Handling
Reasoners automatically track errors in workflow DAGs.
@app.reasoner
async def safe_analysis(ticket_text: str) -> dict:
"""Analysis with error handling."""
try:
result = await app.ai(user=ticket_text)
return {"status": "success", "result": result}
except Exception as e:
# Error is automatically tracked in workflow DAG
return {"status": "error", "message": str(e)}Failed executions appear in workflow DAG:
{
"execution_id": "exec-abc123",
"status": "failed",
"error": "LLM API rate limit exceeded",
"duration_ms": 500
}Performance Considerations
Direct Function Calls vs app.call():
# Same agent - use direct import (fastest)
from reasoners.priority import classify_priority
@app.reasoner
async def quick_route(ticket_text: str) -> str:
priority = classify_priority(ticket_text) # Direct call, no network overhead
return await app.ai(user=f"Route {priority} ticket: {ticket_text}")
# Different agent - use app.call() (enables workflow tracking)
@app.reasoner
async def cross_agent_route(ticket_text: str) -> str:
priority = await app.call(
"priority-agent.classify_priority", # Network call, full DAG tracking
ticket_text=ticket_text
)
return await app.ai(user=f"Route {priority} ticket: {ticket_text}")Workflow Tracking Overhead:
- Enabled: ~5-10ms per reasoner call for DAG updates
- Disabled: No overhead, but no DAG visualization or context propagation
Related
- Agent Node - Core agent initialization and configuration
- app.ai() - LLM interface used within reasoners
- app.call() - Cross-agent communication
- app.memory - Shared state management
- Execution Context - Workflow metadata and tracking
- Async Execution - Architecture and control plane
- Make Your Agent Async - Practical patterns and examples