Agent Node

Core component for creating distributed AI agent nodes in the Agentfield ecosystem

Agent Node

Core component for creating distributed AI agent nodes in the Agentfield ecosystem

The foundational class for creating AI agent nodes that connect to Agentfield's distributed infrastructure. Inherits from FastAPI to provide HTTP endpoints while integrating with Agentfield's execution gateway, memory system, and workflow tracking.

What It Is

Agent is a FastAPI subclass that transforms Python functions into distributed, API-accessible AI services. When you create an Agent instance, you get:

  • Automatic REST API - Every reasoner and skill becomes an HTTP endpoint
  • Agentfield Server Integration - Service discovery, routing, and workflow orchestration
  • Execution Context - Automatic workflow DAG building and parent-child tracking
  • Memory Interface - Scoped persistent storage across distributed agents
  • MCP Integration - Dynamic skill generation from Model Context Protocol servers
  • Identity System - DID-based agent identity with verifiable credentials

Agentfield routes all app.call() requests through its execution gateway for proper workflow tracking, even when calling functions on the same agent. This ensures complete observability and DAG construction.

Basic Example

from agentfield import Agent
from pydantic import BaseModel

# Initialize agent
app = Agent(
    node_id="sentiment_analyzer",
    agentfield_server="http://localhost:8080"
)

class SentimentResult(BaseModel):
    sentiment: str
    confidence: float
    keywords: list[str]

# Define AI-powered reasoner
@app.reasoner()
async def analyze_sentiment(text: str) -> SentimentResult:
    """Analyze text sentiment using LLM."""
    return await app.ai(
        system="Analyze sentiment and extract keywords.",
        user=text,
        schema=SentimentResult
    )

# Define deterministic skill
@app.skill(tags=["utility"])
def format_result(sentiment: str, confidence: float) -> str:
    """Format sentiment analysis result."""
    return f"Sentiment: {sentiment} ({confidence:.0%} confidence)"

# Start server
if __name__ == "__main__":
    app.serve(port=8001)
curl -X POST http://localhost:8080/api/v1/execute/sentiment_analyzer.analyze_sentiment \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "text": "I love this product! It works amazingly well."
    }
  }'
{
  "execution_id": "exec-abc123",
  "workflow_id": "wf-def456",
  "status": "completed",
  "result": {
    "sentiment": "positive",
    "confidence": 0.95,
    "keywords": ["love", "product", "amazingly", "well"]
  },
  "duration_ms": 850,
  "timestamp": "2024-07-08T18:20:05Z"
}
{
  "workflow_dag": {
    "nodes": [
      {
        "id": "sentiment_analyzer.analyze_sentiment",
        "type": "reasoner",
        "status": "completed",
        "duration_ms": 850
      }
    ],
    "edges": []
  }
}

Initialization Parameters

Constructor Parameters

Prop

Type

# Basic initialization
app = Agent(node_id="customer_support")

# Full configuration
app = Agent(
    node_id="my_agent",
    agentfield_server="https://agentfield.company.com",
    version="2.1.0",
    ai_config=AIConfig(model="gpt-4o", temperature=0.7),
    memory_config=MemoryConfig(memory_retention="persistent"),
    dev_mode=True,
    callback_url="http://my-agent.company.com:8001"
)

AIConfig Fields

Prop

Type

MemoryConfig Fields

Prop

Type

AsyncConfig Fields

Prop

Type

Core Methods

app.ai()

Primary interface for LLM interactions with multimodal support.

async def ai(
    *args: Any,
    system: Optional[str] = None,
    user: Optional[str] = None,
    schema: Optional[Type[BaseModel]] = None,
    model: Optional[str] = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    stream: Optional[bool] = None,
    response_format: Optional[str] = None,
    context: Optional[Dict] = None,
    memory_scope: Optional[List[str]] = None,
    **kwargs
) -> Any
# Simple text prompt
response = await app.ai("What is the capital of France?")

# System + user pattern
response = await app.ai(
    system="You are a geography expert.",
    user="What is the capital of France?"
)
from pydantic import BaseModel

class CapitalInfo(BaseModel):
    city: str
    country: str
    population: int

info = await app.ai(
    "Provide details about the capital of France.",
    schema=CapitalInfo
)
# Returns: CapitalInfo(city="Paris", country="France", population=2161000)
# Analyze image from URL
analysis = await app.ai(
    "Describe this image in detail.",
    "https://example.com/image.jpg"
)

# Mix text, images, and audio
response = await app.ai(
    "Compare the audio description with the visual content",
    "./product_review.wav",
    "https://example.com/product.jpg",
    "Additional context: Premium product line."
)
# Generate audio with default settings
audio_result = await app.ai_with_audio("Say hello warmly")
audio_result.audio.save("greeting.wav")

# Custom voice and format
audio_result = await app.ai_with_audio(
    "Explain quantum computing",
    voice="nova",
    format="mp3"
)
audio_result.audio.play()
# Generate image
image_result = await app.ai_with_vision("A sunset over mountains")
image_result.images[0].save("sunset.png")

# High-quality image
image_result = await app.ai_with_vision(
    "Futuristic cityscape with flying cars",
    size="1792x1024",
    quality="hd",
    style="vivid"
)

Returns: MultimodalResponse object with text, audio, and image access methods.

Learn more about app.ai() →

app.call()

Execute reasoners or skills on other agents via Agentfield's execution gateway.

async def call(
    target: str,
    *args,
    **kwargs
) -> dict

Parameters:

  • target - Format: "node_id.function_name" (e.g., "sentiment_analyzer.analyze_sentiment")
  • *args - Positional arguments (auto-mapped to function parameters)
  • **kwargs - Keyword arguments
@app.reasoner()
async def process_customer_feedback(customer_id: int, feedback: str) -> dict:
    # Call sentiment analyzer agent
    sentiment = await app.call(
        "sentiment_analyzer.analyze_sentiment",
        text=feedback
    )

    # Call risk analyzer with sentiment data
    risk = await app.call(
        "risk_analyzer.assess_risk",
        customer_id=customer_id,
        sentiment=sentiment.get("sentiment")
    )

    return {"sentiment": sentiment, "risk": risk}

All app.call() requests route through Agentfield server for proper workflow tracking and DAG construction, even when calling functions on the same agent.

Returns: dict - Always returns JSON/dict objects (no automatic schema conversion)

Learn more about app.call() →

app.memory

Access to scoped persistent memory system.

@property
def memory -> Optional[MemoryInterface]
@app.reasoner()
async def process_user_request(user_id: int, request: str) -> dict:
    # Store in workflow-scoped memory
    await app.memory.set("current_request", request)

    # Get from memory with default
    user_prefs = await app.memory.get(
        f"user_{user_id}_preferences",
        default={"theme": "light"}
    )

    # Check existence
    if await app.memory.exists(f"user_{user_id}_history"):
        history = await app.memory.get(f"user_{user_id}_history")

    return {"processed": True}

Memory Scopes:

  • app.memory - Automatic hierarchical scoping (workflow → session → actor → global)
  • app.memory.session(session_id) - Session-scoped memory
  • app.memory.actor(actor_id) - Actor-scoped memory
  • app.memory.global_scope - Global memory

Learn more about app.memory →

app.note()

Add execution notes for debugging and tracking.

def note(
    message: str,
    tags: List[str] = None
) -> None
@app.reasoner()
async def process_data(data: str) -> dict:
    app.note("Starting data processing", ["debug", "processing"])

    result = await some_processing(data)

    app.note(f"Processing completed with {len(result)} items", ["info"])
    return result

Notes are fire-and-forget and run asynchronously without blocking execution.

Learn more about app.note() →

Decorators

@app.reasoner()

Register AI-powered functions that use LLMs for complex reasoning.

def reasoner(
    path: Optional[str] = None,
    name: Optional[str] = None
) -> Callable

Parameters:

  • path - Custom API endpoint path (default: /reasoners/{function_name})
  • name - Explicit Agentfield registration ID (default: function name)
from pydantic import BaseModel

class AnalysisResult(BaseModel):
    summary: str
    key_findings: list[str]
    confidence: float

@app.reasoner()
async def analyze_document(document: str, focus: str = "general") -> AnalysisResult:
    """Analyze document with AI."""
    return await app.ai(
        system=f"Analyze this document focusing on: {focus}",
        user=document,
        schema=AnalysisResult
    )

Auto-Generated Features:

  • REST API endpoint: POST /reasoners/analyze_document
  • Input/output schema validation
  • Execution context injection
  • Workflow DAG tracking
  • Verifiable credential generation

Learn more about @app.reasoner() →

@app.skill()

Register deterministic functions for business logic and integrations.

def skill(
    tags: Optional[List[str]] = None,
    path: Optional[str] = None,
    name: Optional[str] = None
) -> Callable

Parameters:

  • tags - List of tags for organization (e.g., ["database", "user"])
  • path - Custom API endpoint path (default: /skills/{function_name})
  • name - Explicit Agentfield registration ID (default: function name)
@app.skill(tags=["database", "user"])
def get_user_profile(user_id: int) -> dict:
    """Retrieve user profile from database."""
    user = database.get_user(user_id)
    if not user:
        raise ValueError(f"User {user_id} not found")

    return {
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "created_at": user.created_at.isoformat()
    }

Auto-Generated Features:

  • REST API endpoint: POST /skills/get_user_profile
  • Input schema validation
  • Execution context injection
  • Workflow DAG tracking

Learn more about @app.skill() →

@app.memory.on_change()

Subscribe to memory change events for reactive programming.

def on_change(
    pattern: Union[str, List[str]]
) -> Callable
# Single pattern
@app.memory.on_change("customer_*")
async def handle_customer_changes(event):
    customer_id = event.key.replace("customer_", "")
    if event.action == "set":
        await notify_customer_team(customer_id, event.data)

# Multiple patterns
@app.memory.on_change(["order_*.status", "order_*.payment"])
async def handle_order_updates(event):
    order_id = event.key.split('_')[1].split('.')[0]
    await process_order_update(order_id, event)

Learn more about memory events →

Router System

Organize related reasoners and skills with AgentRouter.

from agentfield.router import AgentRouter

app = Agent(node_id="user_agent")

# Create router with prefix
profile = AgentRouter(prefix="Users/Profile-v1")

@profile.reasoner()
async def get_profile(user_id: str) -> dict:
    return {"user_id": user_id}

@profile.skill()
def update_preferences(user_id: str, preferences: dict) -> dict:
    return {"user_id": user_id, "preferences": preferences}

# Include router (prefix becomes: users_profile_v1_)
app.include_router(profile)

# Registered IDs:
# - users_profile_v1_get_profile
# - users_profile_v1_update_preferences

Prefix Translation:

  • "Billing"billing_
  • "Support/Inbox"support_inbox_
  • "API/v2/Users"api_v2_users_

Learn more about AgentRouter →

Server Lifecycle

serve()

Start the agent server with Agentfield integration.

def serve(
    port: Optional[int] = None,
    host: str = "0.0.0.0",
    dev: bool = False,
    heartbeat_interval: int = 2,
    auto_port: bool = False,
    **kwargs
) -> None

Parameters:

  • port - Server port (auto-discovers if None)
  • host - Host address (default: "0.0.0.0")
  • dev - Enable development mode with auto-reload
  • heartbeat_interval - Heartbeat frequency in seconds (default: 2)
  • auto_port - Auto-find available port (default: False)
  • **kwargs - Additional uvicorn parameters
# Basic server
app.serve()

# Development server
app.serve(
    port=8001,
    dev=True
)

# Production server
app.serve(
    port=8080,
    host="0.0.0.0"
)

Server Lifecycle:

  1. Initialize routes and handlers
  2. Start MCP servers (if configured)
  3. Register with Agentfield server
  4. Start heartbeat loop
  5. Ready to receive requests
  6. Graceful shutdown on SIGINT/SIGTERM

Auto-Generated Endpoints:

  • POST /reasoners/{name} - Execute reasoner
  • POST /skills/{name} - Execute skill
  • GET /health - Health check
  • GET /mcp/status - MCP server status
  • GET /docs - Swagger UI
  • GET /redoc - ReDoc documentation

Auto-Generated Features

When you create an Agent instance, Agentfield automatically provides:

REST API Endpoints

Every reasoner and skill becomes a REST endpoint:

# Synchronous execution
POST /api/v1/execute/{node_id}.{function_name}

# Asynchronous execution
POST /api/v1/execute/async/{node_id}.{function_name}

# Execution status
GET /api/v1/execute/status/{execution_id}

Workflow DAG Tracking

All executions are tracked in a Directed Acyclic Graph:

{
  "workflow_dag": {
    "nodes": [
      {
        "id": "agent.reasoner_name",
        "type": "reasoner",
        "status": "completed",
        "duration_ms": 850
      }
    ],
    "edges": [
      {
        "from": "parent_agent.parent_reasoner",
        "to": "agent.reasoner_name"
      }
    ]
  }
}

DID Registration

Agents automatically receive Decentralized Identifiers:

# Access agent DID
agent_did = app.did_manager.get_agent_did()
# Returns: "did:agentfield:agent:sentiment_analyzer"

# DIDs are used for:
# - Agent identity and authentication
# - Verifiable credential generation
# - Audit trails and compliance

MCP Skill Discovery Beta

MCP servers are automatically discovered and their tools become skills:

# MCP server tools automatically become callable skills
result = await app.call(
    "my_agent.github_api_create_issue",
    owner="myorg",
    repo="myrepo",
    title="Bug report",
    body="Description"
)

Execution Context Propagation

Every request automatically includes execution context:

@app.reasoner()
async def my_reasoner(data: str, execution_context: ExecutionContext = None) -> dict:
    if execution_context:
        print(f"Workflow ID: {execution_context.workflow_id}")
        print(f"Session ID: {execution_context.session_id}")
        print(f"Parent Execution: {execution_context.parent_execution_id}")

    return {"processed": True}

Production Patterns

Multi-Agent Orchestration

from agentfield import Agent
from pydantic import BaseModel

app = Agent(node_id="orchestrator")

class ProcessingResult(BaseModel):
    user_id: int
    sentiment_score: float
    risk_level: str
    actions_taken: list[str]

@app.reasoner()
async def process_user_feedback(user_id: int, feedback: str) -> ProcessingResult:
    """Orchestrate multi-agent workflow."""

    # Step 1: Analyze sentiment
    sentiment = await app.call(
        "sentiment_analyzer.analyze_sentiment",
        text=feedback
    )

    # Step 2: Assess risk
    risk = await app.call(
        "risk_analyzer.assess_user_risk",
        user_id=user_id,
        sentiment=sentiment.get("sentiment")
    )

    # Step 3: Generate recommendations
    recommendations = await app.call(
        "recommendation_engine.generate_recommendations",
        user_id=user_id,
        sentiment_data=sentiment,
        risk_data=risk
    )

    # Step 4: Take automated actions
    actions = []
    if risk.get("risk_level") == "high":
        notification = await app.call(
            "notification_agent.send_alert",
            type="high_risk_user",
            user_id=user_id
        )
        actions.append(f"Alert sent: {notification.get('alert_id')}")

    return ProcessingResult(
        user_id=user_id,
        sentiment_score=sentiment.get("confidence"),
        risk_level=risk.get("risk_level"),
        actions_taken=actions
    )

Error Handling

@app.reasoner()
async def resilient_workflow(user_id: int) -> dict:
    """Build resilient workflows with proper error handling."""
    results = {"user_id": user_id, "steps_completed": []}

    try:
        # Critical step - must succeed
        user_data = await app.call(
            "user_service.get_user_data",
            user_id=user_id
        )
        results["steps_completed"].append("user_data_retrieved")

        # Optional enhancement - can fail gracefully
        try:
            enrichment = await app.call(
                "enrichment_service.enrich_user_data",
                user_data=user_data
            )
            results["enriched_data"] = enrichment
            results["steps_completed"].append("data_enriched")
        except Exception as e:
            results["enrichment_error"] = str(e)
            # Continue with basic data

        # Final processing
        final_result = await app.call(
            "processor.finalize_processing",
            user_data=user_data,
            enriched_data=results.get("enriched_data")
        )

        results["final_result"] = final_result
        results["steps_completed"].append("processing_completed")

    except Exception as e:
        results["error"] = str(e)
        results["status"] = "failed"

    return results

Parallel Execution

@app.reasoner()
async def parallel_processing(items: list[dict]) -> dict:
    """Process items in parallel for better performance."""
    import asyncio

    # Process items in parallel
    tasks = [
        app.call("item_processor.process_item", item=item)
        for item in items
    ]

    # Wait for all to complete
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Separate successes and failures
    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [str(r) for r in results if isinstance(r, Exception)]

    # Aggregate results
    summary = await app.call(
        "aggregator.summarize_results",
        successful_count=len(successful),
        failed_count=len(failed),
        results=successful
    )

    return {
        "summary": summary,
        "successful_items": len(successful),
        "failed_items": len(failed)
    }