app.memory

Shared state across distributed agents with automatic scoping and real-time events

app.memory

Shared state across distributed agents with automatic scoping and real-time events

Persistent memory system that enables seamless state sharing across distributed agents. Automatically scoped to workflow, session, actor, or global contexts with zero configuration.

Zero-Config Advantage: Unlike traditional agent frameworks that require manual vector stores or shared state management, Agentfield's memory automatically scopes to execution context. No setup, no key management, no coordination code.

Basic Example

from agentfield import Agent

app = Agent(node_id="autonomous_recruiter")

@app.reasoner
async def intelligent_candidate_screening(resume: dict, job_id: str):
    """AI screens candidates autonomously, learning from hiring patterns.

    Replaces: Manual resume review, keyword matching systems,
    and rigid applicant tracking workflows.
    """

    # AI analyzes candidate fit - no hardcoded criteria
    screening_analysis = await app.ai(
        system="You are an expert recruiter. Analyze candidate fit considering skills, experience, culture, and growth potential.",
        user=f"Resume: {resume}\nJob requirements: {await get_job_requirements(job_id)}",
        schema=CandidateAnalysis
    )

    # Store AI's reasoning in workflow memory for other agents
    await app.memory.set(f"candidate_{resume['id']}_analysis", {
        "fit_score": screening_analysis.fit_score,
        "strengths": screening_analysis.strengths,
        "concerns": screening_analysis.concerns,
        "ai_recommendation": screening_analysis.recommendation,
        "reasoning": screening_analysis.detailed_reasoning
    })

    # Get hiring manager preferences from session memory
    session_mem = app.memory.session(f"hiring_manager_{job_id}")
    manager_preferences = await session_mem.get("screening_preferences", default={
        "min_fit_score": 70,
        "auto_advance_threshold": 85
    })

    # AI decides next action autonomously
    if screening_analysis.fit_score >= manager_preferences["auto_advance_threshold"]:
        # High-quality candidate - AI autonomously schedules interview
        await app.memory.set(f"interview_scheduled_{resume['id']}", {
            "candidate_id": resume['id'],
            "job_id": job_id,
            "scheduled_by": "ai_autonomous",
            "fit_score": screening_analysis.fit_score
        })
        return {"decision": "auto_advance", "fit_score": screening_analysis.fit_score}

    elif screening_analysis.fit_score >= manager_preferences["min_fit_score"]:
        # Good candidate - flag for human review
        await app.memory.set(f"review_queue_{job_id}", {
            "candidate_id": resume['id'],
            "priority": "medium",
            "ai_analysis": screening_analysis
        })
        return {"decision": "human_review", "fit_score": screening_analysis.fit_score}

    else:
        # Not a fit - AI sends personalized rejection
        return {"decision": "reject", "fit_score": screening_analysis.fit_score}

What are Agent A and Agent B?

In Agentfield, "Agent A" and "Agent B" refer to distributed agent nodes running as separate processes (like microservices), each connected to the same Agentfield control plane. This architecture enables true distributed agent systems where agents can seamlessly share state without manual coordination.

Learn more about distributed agent nodes in the Agent Node documentation.

# Agent A (support-agent) - Distributed node on server 1
@app.reasoner
async def analyze_ticket(ticket_id: str):
    await app.memory.set("ticket_priority", "high")
    await app.memory.set("ticket_category", "billing")

    # Call specialist agent on different server
    result = await app.call(
        "specialist-agent.handle_billing",
        ticket_id=ticket_id
    )
    return result
# Agent B (specialist-agent) - Distributed node on server 2
@app.reasoner
async def handle_billing(ticket_id: str):
    # Automatically accesses Agent A's workflow memory
    priority = await app.memory.get("ticket_priority")  # "high"
    category = await app.memory.get("ticket_category")  # "billing"

    # Both agents share workflow context - zero config
    return {"priority": priority, "category": category}
1. Agent A (server 1) sets workflow memory
2. Agent A calls Agent B via app.call()
3. Agentfield propagates execution context automatically
4. Agent B (server 2) reads same workflow memory
5. No manual key coordination or message passing needed

This works because both agents connect to the same
Agentfield control plane, which manages memory and context.

Scope Hierarchy

Memory automatically resolves through four hierarchical scopes. When you call app.memory.get(), Agentfield searches in order until it finds the key.

When to Use Each Scope

Multi-agent task coordination - Automatically scoped to the current workflow execution.

When to use:

  • Passing data between agents in a multi-step workflow
  • Tracking progress through complex agent orchestrations
  • Sharing intermediate results during task execution
  • Coordinating state across distributed agent calls

Application examples:

  • Customer support ticket routing (priority, category, customer context)
  • Document processing pipelines (extracted data, validation status)
  • Multi-agent research tasks (findings, sources, synthesis state)
  • Order fulfillment workflows (inventory checks, payment status, shipping info)

Lifetime: Exists only for the duration of the workflow execution. Automatically cleaned up when workflow completes.

# Workflow memory is automatic - no scope needed
await app.memory.set("processing_stage", "validation")
await app.memory.set("extracted_entities", entities)

User session persistence - Maintains state across multiple workflow executions for the same user.

When to use:

  • Storing user preferences and settings
  • Maintaining conversation history across interactions
  • Tracking user journey and behavior patterns
  • Caching user-specific data for performance

Application examples:

  • Chatbot conversation history (messages, context, user preferences)
  • E-commerce shopping sessions (cart, browsing history, recommendations)
  • Multi-turn form filling (partial data, validation state)
  • Personalized agent behavior (user's preferred communication style)

Lifetime: Persists until explicitly deleted. Survives workflow completions and agent restarts.

# Session memory requires explicit session ID
session_mem = app.memory.session(user_id)
await session_mem.set("conversation_history", messages)
await session_mem.set("user_preferences", prefs)

Agent-private state - Isolated to a specific agent instance, not shared with other agents.

When to use:

  • Agent-specific configuration and settings
  • Internal caching for performance optimization
  • Rate limiting and quota tracking per agent
  • Agent health metrics and diagnostics

Application examples:

  • API rate limit tracking (requests per minute, quota remaining)
  • Agent-specific model configurations (temperature, max tokens)
  • Internal performance metrics (response times, error rates)
  • Private caching of frequently accessed data

Lifetime: Persists across workflow executions. Tied to the agent instance identifier.

# Actor memory requires explicit actor ID
actor_mem = app.memory.actor(app.node_id)
await actor_mem.set("api_quota_remaining", 950)
await actor_mem.set("cache_hit_rate", 0.85)

System-wide configuration - Shared across all agents, workflows, and sessions.

When to use:

  • Application-wide configuration and feature flags
  • Shared lookup tables and reference data
  • System-wide counters and metrics
  • Global rate limits and quotas

Application examples:

  • Feature flags (enable/disable features across all agents)
  • Pricing tiers and limits (free: 100, premium: 1000)
  • API endpoints and service URLs
  • System-wide rate limits and circuit breakers

Lifetime: Permanent until explicitly deleted. Survives all restarts and deployments.

# Global memory is accessed via global_scope
await app.memory.global_scope.set("feature_flags", {
    "new_ui": True,
    "beta_api": False
})
await app.memory.global_scope.set("tier_limits", limits)

Scope Selection Quick Reference:

Prop

Type

Scope ID Patterns

# Workflow scope - automatic from execution context
# ID format: wf-{uuid} (e.g., "wf-abc123def456")
await app.memory.set("task_status", "processing")

# Session scope - user/session identifier
# ID format: Your choice (e.g., "user_12345", "session_abc")
session_mem = app.memory.session("user_12345")
await session_mem.set("preferences", {"theme": "dark"})

# Actor scope - agent instance identifier
# ID format: {agent_node_id} or custom (e.g., "support-agent-001")
actor_mem = app.memory.actor("support-agent-001")
await actor_mem.set("queue_position", 3)

# Global scope - no ID needed
await app.memory.global_scope.set("api_version", "2.1.0")

Production Tip: Use consistent ID patterns across your system. For sessions, use your auth system's user IDs. For actors, use agent node IDs or instance identifiers from your orchestration platform.

Core Operations

set()

Store a value with automatic scoping.

Prop

Type

# Automatic scoping (uses current workflow)
await app.memory.set("customer_tier", "premium")

# Complex data structures
await app.memory.set("customer_profile", {
    "id": "cust_123",
    "tier": "premium",
    "preferences": {"notifications": True}
})

# For other scopes, use scoped clients
await app.memory.global_scope.set("global_config", config_data)
await app.memory.session("user_123").set("preferences", prefs)

get()

Retrieve a value with hierarchical lookup.

Prop

Type

# Hierarchical lookup (workflow → session → actor → global)
tier = await app.memory.get("customer_tier", default="standard")

# With default value
preferences = await app.memory.get("user_preferences", default={
    "theme": "light",
    "notifications": True
})

# For specific scope lookup, use scoped clients
global_config = await app.memory.global_scope.get("api_config")
session_prefs = await app.memory.session("user_123").get("preferences")

exists()

Check if a key exists in any scope.

Prop

Type

# Check before accessing
if await app.memory.exists("customer_history"):
    history = await app.memory.get("customer_history")
else:
    history = []

# Check in specific scope
if await app.memory.global_scope.exists("feature_flags"):
    flags = await app.memory.global_scope.get("feature_flags")

delete()

Remove a value from memory.

Prop

Type

# Delete from current scope
await app.memory.delete("temp_session_data")

# Delete from specific scope
await app.memory.session("user_123").delete("cached_results")

list_keys()

List all keys in a specific scope. Use scoped memory clients to list keys.

# List session keys
session_mem = app.memory.session("user_123")
session_keys = await session_mem.list_keys()

# List workflow keys
workflow_mem = app.memory.workflow("wf-abc123")
workflow_keys = await workflow_mem.list_keys()

# List global keys
global_keys = await app.memory.global_scope.list_keys()

Scoped Memory Clients

Access memory in specific scopes with dedicated clients.

session()

Session-scoped memory for user-specific data.

# Get session client
session_mem = app.memory.session("user_12345")

# Session operations
await session_mem.set("preferences", {"theme": "dark"})
prefs = await session_mem.get("preferences")
await session_mem.delete("temp_data")
keys = await session_mem.list_keys()

# Use in reasoners
@app.reasoner
async def update_user_settings(user_id: str, settings: dict):
    session_mem = app.memory.session(user_id)
    await session_mem.set("settings", settings)
    return {"updated": True}

actor()

Actor-scoped memory for agent-private state.

# Get actor client
actor_mem = app.memory.actor("support-agent-001")

# Private agent state
await actor_mem.set("queue_position", 5)
await actor_mem.set("internal_cache", {"last_sync": "2025-01-15"})

# Agent instance configuration
@app.reasoner
async def configure_agent(agent_id: str, config: dict):
    actor_mem = app.memory.actor(agent_id)
    await actor_mem.set("config", config)
    return {"configured": agent_id}

workflow()

Workflow-scoped memory for explicit workflow access.

# Get workflow client
workflow_mem = app.memory.workflow("wf-abc123")

# Workflow state management
await workflow_mem.set("current_step", "processing")
await workflow_mem.set("results", {"step1": "complete"})

# Cross-workflow access
@app.reasoner
async def check_workflow_status(workflow_id: str):
    workflow_mem = app.memory.workflow(workflow_id)
    status = await workflow_mem.get("current_step")
    return {"workflow_id": workflow_id, "status": status}

global_scope

Global memory for system-wide shared data.

# Global configuration
await app.memory.global_scope.set("api_version", "2.1.0")
await app.memory.global_scope.set("feature_flags", {
    "new_ui": True,
    "beta_features": False
})

# Global counters
current_count = await app.memory.global_scope.get("request_count", default=0)
await app.memory.global_scope.set("request_count", current_count + 1)

# System-wide lookup tables
await app.memory.global_scope.set("tier_limits", {
    "free": 100,
    "premium": 1000,
    "enterprise": 10000
})

Common Patterns

Cross-Agent Memory Sharing

Agents automatically share workflow memory through execution context.

# Agent A: Customer Service
@app.reasoner
async def handle_inquiry(customer_id: str, inquiry: str):
    # Set workflow context
    await app.memory.set("customer_id", customer_id)
    await app.memory.set("inquiry_type", "technical")
    await app.memory.set("priority", "high")

    # Call specialist agent
    result = await app.call(
        "technical-agent.analyze_issue",
        inquiry=inquiry
    )
    return result

# Agent B: Technical Specialist (different server)
@app.reasoner
async def analyze_issue(inquiry: str):
    # Access workflow memory set by Agent A
    customer_id = await app.memory.get("customer_id")
    priority = await app.memory.get("priority")  # "high"

    # Add analysis to shared workflow memory
    await app.memory.set("analysis_result", {
        "category": "bug",
        "severity": "critical",
        "estimated_fix": "2 hours"
    })

    return {"customer_id": customer_id, "priority": priority}

Execution context propagates automatically through app.call(). No manual header management or context passing required.

Session Persistence Across Workflows

Maintain user state across multiple workflow executions.

@app.reasoner
async def chat_with_history(user_id: str, message: str):
    # Get session memory for this user
    session_mem = app.memory.session(user_id)

    # Retrieve conversation history (persists across workflows)
    history = await session_mem.get("conversation_history", default=[])

    # Add new message
    history.append({
        "role": "user",
        "content": message,
        "timestamp": datetime.now().isoformat()
    })

    # Generate response with history context
    response = await app.ai(
        system="You are a helpful assistant.",
        user=f"History: {history[-5:]}\nNew message: {message}"
    )

    # Update history with response
    history.append({
        "role": "assistant",
        "content": response,
        "timestamp": datetime.now().isoformat()
    })

    # Save back to session memory
    await session_mem.set("conversation_history", history)

    return response

Conditional Memory Updates

Update memory only when conditions are met.

@app.reasoner
async def update_customer_tier(customer_id: str, new_tier: str):
    # Get current customer data
    customer = await app.memory.get(f"customer_{customer_id}")

    if customer and customer.get("tier") != new_tier:
        # Update tier and track change
        customer["tier"] = new_tier
        customer["tier_updated_at"] = datetime.now().isoformat()

        await app.memory.set(f"customer_{customer_id}", customer)

        # Set notification flag
        await app.memory.set(f"tier_change_{customer_id}", {
            "old_tier": customer.get("tier"),
            "new_tier": new_tier,
            "timestamp": datetime.now().isoformat()
        })

        return {"updated": True, "tier": new_tier}

    return {"updated": False, "tier": customer.get("tier")}

Memory-Based State Machines

Implement workflow state transitions with memory.

@app.reasoner
async def advance_workflow_state(event: str):
    # Get current state
    current_state = await app.memory.get("workflow_state", default="initial")

    # Define state transitions
    transitions = {
        "initial": {"start": "processing"},
        "processing": {"complete": "completed", "error": "failed"},
        "completed": {"restart": "initial"},
        "failed": {"retry": "processing", "abort": "aborted"}
    }

    # Check if transition is valid
    if current_state in transitions and event in transitions[current_state]:
        new_state = transitions[current_state][event]

        # Update state
        await app.memory.set("workflow_state", new_state)

        # Track state history
        history = await app.memory.get("state_history", default=[])
        history.append({
            "from": current_state,
            "to": new_state,
            "event": event,
            "timestamp": datetime.now().isoformat()
        })
        await app.memory.set("state_history", history)

        return {"transitioned": True, "new_state": new_state}

    return {"transitioned": False, "current_state": current_state}

Global Configuration Management

Manage system-wide settings with global memory.

@app.reasoner
async def get_tier_limit(tier: str) -> int:
    # Get global tier limits
    limits = await app.memory.global_scope.get("tier_limits", default={
        "free": 100,
        "premium": 1000,
        "enterprise": 10000
    })

    return limits.get(tier, 100)

@app.reasoner
async def update_feature_flags(flags: dict):
    # Update global feature flags
    current_flags = await app.memory.global_scope.get("feature_flags", default={})
    current_flags.update(flags)

    await app.memory.global_scope.set("feature_flags", current_flags)

    return {"updated": True, "flags": current_flags}

@app.reasoner
async def check_feature_enabled(feature: str) -> bool:
    # Check if feature is enabled globally
    flags = await app.memory.global_scope.get("feature_flags", default={})
    return flags.get(feature, False)

Actor-Specific Caching

Use actor memory for agent-private caching.

@app.reasoner
async def get_with_cache(key: str, fetch_fn):
    # Get actor memory for this agent instance
    actor_mem = app.memory.actor(app.node_id)

    # Check cache
    cache_key = f"cache_{key}"
    cached = await actor_mem.get(cache_key)

    if cached:
        # Check if cache is still valid
        if datetime.fromisoformat(cached["expires_at"]) > datetime.now():
            return cached["data"]

    # Cache miss or expired - fetch new data
    data = await fetch_fn()

    # Store in cache with expiration
    await actor_mem.set(cache_key, {
        "data": data,
        "expires_at": (datetime.now() + timedelta(minutes=30)).isoformat()
    })

    return data

Event Subscriptions

Subscribe to memory changes with pattern-based event listeners. For comprehensive event system documentation, see Memory Events.

Basic Event Subscription

# Subscribe to customer data changes
@app.memory.on_change("customer_*")
async def handle_customer_changes(event):
    print(f"Customer {event.key} changed: {event.action}")

    if event.action == "set":
        # Process new customer data
        customer_id = event.key.replace("customer_", "")
        await notify_customer_team(customer_id, event.data)

# Multiple patterns
@app.memory.on_change(["user_*.preferences", "user_*.settings"])
async def handle_user_config(event):
    user_id = event.key.split('_')[1].split('.')[0]
    await sync_user_configuration(user_id)

Scoped Event Subscriptions

# Session-scoped events
session_mem = app.memory.session("user_123")

@session_mem.on_change("temp_*")
async def handle_session_temp_data(event):
    # Only receives events for temp_* keys in session user_123
    await process_temporary_data(event)

Event Object

Prop

Type

For advanced event patterns including history replay, pattern matching, and WebSocket subscriptions, see Memory Events.

Storage & Scalability

Agentfield's memory system supports two storage backends optimized for different deployment scenarios.

Storage Backends

Local Development (BoltDB)

  • Embedded key-value store for single-server deployments
  • Zero configuration - works out of the box
  • File-based persistence (agentfield.db)
  • Ideal for development, testing, and single-agent deployments

Production (PostgreSQL)

  • Distributed database for multi-server deployments
  • JSONB columns for flexible data storage
  • Automatic indexing on scope, scope_id, and key
  • Supports horizontal scaling across multiple Agentfield servers

Storage backend is configured in Agentfield server settings. See Deployment Guide for production database setup and configuration.

Persistence & Durability

  • Immediate Persistence: All memory operations are immediately written to disk
  • Crash Recovery: Survives agent restarts, server failures, and deployments
  • Data Integrity: ACID guarantees (PostgreSQL) or file-level consistency (BoltDB)

Scalability Characteristics

  • Horizontal Scaling: Multiple Agentfield server instances share the same PostgreSQL database
  • Read Performance: Sub-10ms latency for typical get operations
  • Write Performance: Sub-20ms latency for set operations with immediate persistence
  • Concurrent Access: PostgreSQL handles concurrent reads/writes across distributed agents
  • Memory Limits: No hard limits - constrained only by PostgreSQL storage capacity

Production Considerations

# Memory operations are async and non-blocking
await app.memory.set("large_dataset", data)  # ~20ms

# Hierarchical lookup adds minimal overhead
value = await app.memory.get("key")  # ~10ms (single scope)
value = await app.memory.get("key")  # ~15ms (searches 2-3 scopes)

# Event subscriptions use WebSocket connections
# Each agent maintains one WebSocket per event client
# Minimal overhead: ~1-2ms per event delivery

For production deployment, database configuration, and scaling strategies, see Deployment Guide.

Best Practices

Key Naming Conventions

Use hierarchical, descriptive key names for better organization and pattern matching.

# ✅ Good - hierarchical and descriptive
await app.memory.set("customer_123.profile", profile_data)
await app.memory.set("customer_123.preferences", prefs)
await app.memory.set("order_456.status", "shipped")
await app.memory.set("order_456.payment", payment_info)

# ❌ Bad - flat and ambiguous
await app.memory.set("c123p", profile_data)
await app.memory.set("data", prefs)
await app.memory.set("status", "shipped")

Scope Selection Guidelines

Choose the appropriate scope based on data lifetime and sharing requirements.

# Workflow - task coordination (short-lived, multi-agent)
await app.memory.set("current_step", "processing")

# Session - user data (medium-lived, user-specific)
session_mem = app.memory.session(user_id)
await session_mem.set("preferences", user_prefs)

# Actor - agent state (long-lived, agent-private)
actor_mem = app.memory.actor(app.node_id)
await actor_mem.set("internal_cache", cache_data)

# Global - system config (permanent, system-wide)
await app.memory.global_scope.set("api_version", "2.1.0")

Error Handling

Always handle potential memory operation failures gracefully.

@app.reasoner
async def safe_memory_operations(key: str, data: dict):
    try:
        await app.memory.set(key, data)
        return {"status": "success"}
    except Exception as e:
        # Log error and use fallback
        print(f"Memory operation failed: {e}")
        await store_locally(key, data)
        return {"status": "fallback", "error": str(e)}

@app.reasoner
async def robust_memory_get(key: str):
    try:
        value = await app.memory.get(key)
        if value is not None:
            return {"value": value, "source": "memory"}
    except Exception as e:
        print(f"Memory get failed: {e}")

    # Fallback to default
    default_value = await get_default_value(key)
    return {"value": default_value, "source": "default"}

Memory Lifecycle Management

Clean up temporary data to prevent unbounded growth.

@app.reasoner
async def cleanup_expired_data(workflow_id: str):
    """Clean up expired temporary data."""
    # Get all temporary keys in workflow using scoped client
    workflow_mem = app.memory.workflow(workflow_id)
    workflow_keys = await workflow_mem.list_keys()
    temp_keys = [k for k in workflow_keys if k.startswith("temp_")]

    for key in temp_keys:
        data = await workflow_mem.get(key)
        if data and "expires_at" in data:
            if datetime.fromisoformat(data["expires_at"]) < datetime.now():
                await workflow_mem.delete(key)

    return {"cleaned_keys": len(temp_keys)}

@app.reasoner
async def set_with_expiration(key: str, data: dict, ttl_minutes: int = 60):
    """Set data with automatic expiration tracking."""
    expires_at = datetime.now() + timedelta(minutes=ttl_minutes)

    temp_data = {
        "data": data,
        "expires_at": expires_at.isoformat(),
        "created_by": app.node_id
    }

    await app.memory.set(f"temp_{key}", temp_data)
    return {"expires_at": expires_at.isoformat()}

Performance Optimization

Batch related operations and use appropriate scoping.

# ✅ Good - batch related data
customer_data = {
    "profile": await app.memory.get("customer_profile"),
    "preferences": await app.memory.get("customer_preferences"),
    "history": await app.memory.get("customer_history")
}

# ✅ Good - use specific scope when known
session_mem = app.memory.session(session_id)
prefs = await session_mem.get("preferences")  # Faster - single scope

# ❌ Avoid - unnecessary hierarchical lookups
prefs = await app.memory.get("preferences")  # Slower - searches all scopes
  • Memory Events - Event system deep dive with pattern matching and history
  • @app.reasoner - Use memory within reasoners for state management
  • app.call() - Cross-agent calls with automatic memory context
  • Execution Context - Understanding automatic scope resolution
  • Deployment - Production database configuration and scaling