app.memory
Shared state across distributed agents with automatic scoping and real-time events
app.memory
Shared state across distributed agents with automatic scoping and real-time events
Persistent memory system that enables seamless state sharing across distributed agents. Automatically scoped to workflow, session, actor, or global contexts with zero configuration.
Zero-Config Advantage: Unlike traditional agent frameworks that require manual vector stores or shared state management, Agentfield's memory automatically scopes to execution context. No setup, no key management, no coordination code.
Basic Example
from agentfield import Agent
app = Agent(node_id="autonomous_recruiter")
@app.reasoner
async def intelligent_candidate_screening(resume: dict, job_id: str):
"""AI screens candidates autonomously, learning from hiring patterns.
Replaces: Manual resume review, keyword matching systems,
and rigid applicant tracking workflows.
"""
# AI analyzes candidate fit - no hardcoded criteria
screening_analysis = await app.ai(
system="You are an expert recruiter. Analyze candidate fit considering skills, experience, culture, and growth potential.",
user=f"Resume: {resume}\nJob requirements: {await get_job_requirements(job_id)}",
schema=CandidateAnalysis
)
# Store AI's reasoning in workflow memory for other agents
await app.memory.set(f"candidate_{resume['id']}_analysis", {
"fit_score": screening_analysis.fit_score,
"strengths": screening_analysis.strengths,
"concerns": screening_analysis.concerns,
"ai_recommendation": screening_analysis.recommendation,
"reasoning": screening_analysis.detailed_reasoning
})
# Get hiring manager preferences from session memory
session_mem = app.memory.session(f"hiring_manager_{job_id}")
manager_preferences = await session_mem.get("screening_preferences", default={
"min_fit_score": 70,
"auto_advance_threshold": 85
})
# AI decides next action autonomously
if screening_analysis.fit_score >= manager_preferences["auto_advance_threshold"]:
# High-quality candidate - AI autonomously schedules interview
await app.memory.set(f"interview_scheduled_{resume['id']}", {
"candidate_id": resume['id'],
"job_id": job_id,
"scheduled_by": "ai_autonomous",
"fit_score": screening_analysis.fit_score
})
return {"decision": "auto_advance", "fit_score": screening_analysis.fit_score}
elif screening_analysis.fit_score >= manager_preferences["min_fit_score"]:
# Good candidate - flag for human review
await app.memory.set(f"review_queue_{job_id}", {
"candidate_id": resume['id'],
"priority": "medium",
"ai_analysis": screening_analysis
})
return {"decision": "human_review", "fit_score": screening_analysis.fit_score}
else:
# Not a fit - AI sends personalized rejection
return {"decision": "reject", "fit_score": screening_analysis.fit_score}What are Agent A and Agent B?
In Agentfield, "Agent A" and "Agent B" refer to distributed agent nodes running as separate processes (like microservices), each connected to the same Agentfield control plane. This architecture enables true distributed agent systems where agents can seamlessly share state without manual coordination.
Learn more about distributed agent nodes in the Agent Node documentation.
# Agent A (support-agent) - Distributed node on server 1
@app.reasoner
async def analyze_ticket(ticket_id: str):
await app.memory.set("ticket_priority", "high")
await app.memory.set("ticket_category", "billing")
# Call specialist agent on different server
result = await app.call(
"specialist-agent.handle_billing",
ticket_id=ticket_id
)
return result# Agent B (specialist-agent) - Distributed node on server 2
@app.reasoner
async def handle_billing(ticket_id: str):
# Automatically accesses Agent A's workflow memory
priority = await app.memory.get("ticket_priority") # "high"
category = await app.memory.get("ticket_category") # "billing"
# Both agents share workflow context - zero config
return {"priority": priority, "category": category}1. Agent A (server 1) sets workflow memory
2. Agent A calls Agent B via app.call()
3. Agentfield propagates execution context automatically
4. Agent B (server 2) reads same workflow memory
5. No manual key coordination or message passing needed
This works because both agents connect to the same
Agentfield control plane, which manages memory and context.Scope Hierarchy
Memory automatically resolves through four hierarchical scopes. When you call app.memory.get(), Agentfield searches in order until it finds the key.
When to Use Each Scope
Multi-agent task coordination - Automatically scoped to the current workflow execution.
When to use:
- Passing data between agents in a multi-step workflow
- Tracking progress through complex agent orchestrations
- Sharing intermediate results during task execution
- Coordinating state across distributed agent calls
Application examples:
- Customer support ticket routing (priority, category, customer context)
- Document processing pipelines (extracted data, validation status)
- Multi-agent research tasks (findings, sources, synthesis state)
- Order fulfillment workflows (inventory checks, payment status, shipping info)
Lifetime: Exists only for the duration of the workflow execution. Automatically cleaned up when workflow completes.
# Workflow memory is automatic - no scope needed
await app.memory.set("processing_stage", "validation")
await app.memory.set("extracted_entities", entities)User session persistence - Maintains state across multiple workflow executions for the same user.
When to use:
- Storing user preferences and settings
- Maintaining conversation history across interactions
- Tracking user journey and behavior patterns
- Caching user-specific data for performance
Application examples:
- Chatbot conversation history (messages, context, user preferences)
- E-commerce shopping sessions (cart, browsing history, recommendations)
- Multi-turn form filling (partial data, validation state)
- Personalized agent behavior (user's preferred communication style)
Lifetime: Persists until explicitly deleted. Survives workflow completions and agent restarts.
# Session memory requires explicit session ID
session_mem = app.memory.session(user_id)
await session_mem.set("conversation_history", messages)
await session_mem.set("user_preferences", prefs)Agent-private state - Isolated to a specific agent instance, not shared with other agents.
When to use:
- Agent-specific configuration and settings
- Internal caching for performance optimization
- Rate limiting and quota tracking per agent
- Agent health metrics and diagnostics
Application examples:
- API rate limit tracking (requests per minute, quota remaining)
- Agent-specific model configurations (temperature, max tokens)
- Internal performance metrics (response times, error rates)
- Private caching of frequently accessed data
Lifetime: Persists across workflow executions. Tied to the agent instance identifier.
# Actor memory requires explicit actor ID
actor_mem = app.memory.actor(app.node_id)
await actor_mem.set("api_quota_remaining", 950)
await actor_mem.set("cache_hit_rate", 0.85)System-wide configuration - Shared across all agents, workflows, and sessions.
When to use:
- Application-wide configuration and feature flags
- Shared lookup tables and reference data
- System-wide counters and metrics
- Global rate limits and quotas
Application examples:
- Feature flags (enable/disable features across all agents)
- Pricing tiers and limits (free: 100, premium: 1000)
- API endpoints and service URLs
- System-wide rate limits and circuit breakers
Lifetime: Permanent until explicitly deleted. Survives all restarts and deployments.
# Global memory is accessed via global_scope
await app.memory.global_scope.set("feature_flags", {
"new_ui": True,
"beta_api": False
})
await app.memory.global_scope.set("tier_limits", limits)Scope Selection Quick Reference:
Prop
Type
Scope ID Patterns
# Workflow scope - automatic from execution context
# ID format: wf-{uuid} (e.g., "wf-abc123def456")
await app.memory.set("task_status", "processing")
# Session scope - user/session identifier
# ID format: Your choice (e.g., "user_12345", "session_abc")
session_mem = app.memory.session("user_12345")
await session_mem.set("preferences", {"theme": "dark"})
# Actor scope - agent instance identifier
# ID format: {agent_node_id} or custom (e.g., "support-agent-001")
actor_mem = app.memory.actor("support-agent-001")
await actor_mem.set("queue_position", 3)
# Global scope - no ID needed
await app.memory.global_scope.set("api_version", "2.1.0")Production Tip: Use consistent ID patterns across your system. For sessions, use your auth system's user IDs. For actors, use agent node IDs or instance identifiers from your orchestration platform.
Core Operations
set()
Store a value with automatic scoping.
Prop
Type
# Automatic scoping (uses current workflow)
await app.memory.set("customer_tier", "premium")
# Complex data structures
await app.memory.set("customer_profile", {
"id": "cust_123",
"tier": "premium",
"preferences": {"notifications": True}
})
# For other scopes, use scoped clients
await app.memory.global_scope.set("global_config", config_data)
await app.memory.session("user_123").set("preferences", prefs)get()
Retrieve a value with hierarchical lookup.
Prop
Type
# Hierarchical lookup (workflow → session → actor → global)
tier = await app.memory.get("customer_tier", default="standard")
# With default value
preferences = await app.memory.get("user_preferences", default={
"theme": "light",
"notifications": True
})
# For specific scope lookup, use scoped clients
global_config = await app.memory.global_scope.get("api_config")
session_prefs = await app.memory.session("user_123").get("preferences")exists()
Check if a key exists in any scope.
Prop
Type
# Check before accessing
if await app.memory.exists("customer_history"):
history = await app.memory.get("customer_history")
else:
history = []
# Check in specific scope
if await app.memory.global_scope.exists("feature_flags"):
flags = await app.memory.global_scope.get("feature_flags")delete()
Remove a value from memory.
Prop
Type
# Delete from current scope
await app.memory.delete("temp_session_data")
# Delete from specific scope
await app.memory.session("user_123").delete("cached_results")list_keys()
List all keys in a specific scope. Use scoped memory clients to list keys.
# List session keys
session_mem = app.memory.session("user_123")
session_keys = await session_mem.list_keys()
# List workflow keys
workflow_mem = app.memory.workflow("wf-abc123")
workflow_keys = await workflow_mem.list_keys()
# List global keys
global_keys = await app.memory.global_scope.list_keys()Scoped Memory Clients
Access memory in specific scopes with dedicated clients.
session()
Session-scoped memory for user-specific data.
# Get session client
session_mem = app.memory.session("user_12345")
# Session operations
await session_mem.set("preferences", {"theme": "dark"})
prefs = await session_mem.get("preferences")
await session_mem.delete("temp_data")
keys = await session_mem.list_keys()
# Use in reasoners
@app.reasoner
async def update_user_settings(user_id: str, settings: dict):
session_mem = app.memory.session(user_id)
await session_mem.set("settings", settings)
return {"updated": True}actor()
Actor-scoped memory for agent-private state.
# Get actor client
actor_mem = app.memory.actor("support-agent-001")
# Private agent state
await actor_mem.set("queue_position", 5)
await actor_mem.set("internal_cache", {"last_sync": "2025-01-15"})
# Agent instance configuration
@app.reasoner
async def configure_agent(agent_id: str, config: dict):
actor_mem = app.memory.actor(agent_id)
await actor_mem.set("config", config)
return {"configured": agent_id}workflow()
Workflow-scoped memory for explicit workflow access.
# Get workflow client
workflow_mem = app.memory.workflow("wf-abc123")
# Workflow state management
await workflow_mem.set("current_step", "processing")
await workflow_mem.set("results", {"step1": "complete"})
# Cross-workflow access
@app.reasoner
async def check_workflow_status(workflow_id: str):
workflow_mem = app.memory.workflow(workflow_id)
status = await workflow_mem.get("current_step")
return {"workflow_id": workflow_id, "status": status}global_scope
Global memory for system-wide shared data.
# Global configuration
await app.memory.global_scope.set("api_version", "2.1.0")
await app.memory.global_scope.set("feature_flags", {
"new_ui": True,
"beta_features": False
})
# Global counters
current_count = await app.memory.global_scope.get("request_count", default=0)
await app.memory.global_scope.set("request_count", current_count + 1)
# System-wide lookup tables
await app.memory.global_scope.set("tier_limits", {
"free": 100,
"premium": 1000,
"enterprise": 10000
})Common Patterns
Cross-Agent Memory Sharing
Agents automatically share workflow memory through execution context.
# Agent A: Customer Service
@app.reasoner
async def handle_inquiry(customer_id: str, inquiry: str):
# Set workflow context
await app.memory.set("customer_id", customer_id)
await app.memory.set("inquiry_type", "technical")
await app.memory.set("priority", "high")
# Call specialist agent
result = await app.call(
"technical-agent.analyze_issue",
inquiry=inquiry
)
return result
# Agent B: Technical Specialist (different server)
@app.reasoner
async def analyze_issue(inquiry: str):
# Access workflow memory set by Agent A
customer_id = await app.memory.get("customer_id")
priority = await app.memory.get("priority") # "high"
# Add analysis to shared workflow memory
await app.memory.set("analysis_result", {
"category": "bug",
"severity": "critical",
"estimated_fix": "2 hours"
})
return {"customer_id": customer_id, "priority": priority}Execution context propagates automatically through app.call(). No manual
header management or context passing required.
Session Persistence Across Workflows
Maintain user state across multiple workflow executions.
@app.reasoner
async def chat_with_history(user_id: str, message: str):
# Get session memory for this user
session_mem = app.memory.session(user_id)
# Retrieve conversation history (persists across workflows)
history = await session_mem.get("conversation_history", default=[])
# Add new message
history.append({
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
})
# Generate response with history context
response = await app.ai(
system="You are a helpful assistant.",
user=f"History: {history[-5:]}\nNew message: {message}"
)
# Update history with response
history.append({
"role": "assistant",
"content": response,
"timestamp": datetime.now().isoformat()
})
# Save back to session memory
await session_mem.set("conversation_history", history)
return responseConditional Memory Updates
Update memory only when conditions are met.
@app.reasoner
async def update_customer_tier(customer_id: str, new_tier: str):
# Get current customer data
customer = await app.memory.get(f"customer_{customer_id}")
if customer and customer.get("tier") != new_tier:
# Update tier and track change
customer["tier"] = new_tier
customer["tier_updated_at"] = datetime.now().isoformat()
await app.memory.set(f"customer_{customer_id}", customer)
# Set notification flag
await app.memory.set(f"tier_change_{customer_id}", {
"old_tier": customer.get("tier"),
"new_tier": new_tier,
"timestamp": datetime.now().isoformat()
})
return {"updated": True, "tier": new_tier}
return {"updated": False, "tier": customer.get("tier")}Memory-Based State Machines
Implement workflow state transitions with memory.
@app.reasoner
async def advance_workflow_state(event: str):
# Get current state
current_state = await app.memory.get("workflow_state", default="initial")
# Define state transitions
transitions = {
"initial": {"start": "processing"},
"processing": {"complete": "completed", "error": "failed"},
"completed": {"restart": "initial"},
"failed": {"retry": "processing", "abort": "aborted"}
}
# Check if transition is valid
if current_state in transitions and event in transitions[current_state]:
new_state = transitions[current_state][event]
# Update state
await app.memory.set("workflow_state", new_state)
# Track state history
history = await app.memory.get("state_history", default=[])
history.append({
"from": current_state,
"to": new_state,
"event": event,
"timestamp": datetime.now().isoformat()
})
await app.memory.set("state_history", history)
return {"transitioned": True, "new_state": new_state}
return {"transitioned": False, "current_state": current_state}Global Configuration Management
Manage system-wide settings with global memory.
@app.reasoner
async def get_tier_limit(tier: str) -> int:
# Get global tier limits
limits = await app.memory.global_scope.get("tier_limits", default={
"free": 100,
"premium": 1000,
"enterprise": 10000
})
return limits.get(tier, 100)
@app.reasoner
async def update_feature_flags(flags: dict):
# Update global feature flags
current_flags = await app.memory.global_scope.get("feature_flags", default={})
current_flags.update(flags)
await app.memory.global_scope.set("feature_flags", current_flags)
return {"updated": True, "flags": current_flags}
@app.reasoner
async def check_feature_enabled(feature: str) -> bool:
# Check if feature is enabled globally
flags = await app.memory.global_scope.get("feature_flags", default={})
return flags.get(feature, False)Actor-Specific Caching
Use actor memory for agent-private caching.
@app.reasoner
async def get_with_cache(key: str, fetch_fn):
# Get actor memory for this agent instance
actor_mem = app.memory.actor(app.node_id)
# Check cache
cache_key = f"cache_{key}"
cached = await actor_mem.get(cache_key)
if cached:
# Check if cache is still valid
if datetime.fromisoformat(cached["expires_at"]) > datetime.now():
return cached["data"]
# Cache miss or expired - fetch new data
data = await fetch_fn()
# Store in cache with expiration
await actor_mem.set(cache_key, {
"data": data,
"expires_at": (datetime.now() + timedelta(minutes=30)).isoformat()
})
return dataEvent Subscriptions
Subscribe to memory changes with pattern-based event listeners. For comprehensive event system documentation, see Memory Events.
Basic Event Subscription
# Subscribe to customer data changes
@app.memory.on_change("customer_*")
async def handle_customer_changes(event):
print(f"Customer {event.key} changed: {event.action}")
if event.action == "set":
# Process new customer data
customer_id = event.key.replace("customer_", "")
await notify_customer_team(customer_id, event.data)
# Multiple patterns
@app.memory.on_change(["user_*.preferences", "user_*.settings"])
async def handle_user_config(event):
user_id = event.key.split('_')[1].split('.')[0]
await sync_user_configuration(user_id)Scoped Event Subscriptions
# Session-scoped events
session_mem = app.memory.session("user_123")
@session_mem.on_change("temp_*")
async def handle_session_temp_data(event):
# Only receives events for temp_* keys in session user_123
await process_temporary_data(event)Event Object
Prop
Type
For advanced event patterns including history replay, pattern matching, and WebSocket subscriptions, see Memory Events.
Storage & Scalability
Agentfield's memory system supports two storage backends optimized for different deployment scenarios.
Storage Backends
Local Development (BoltDB)
- Embedded key-value store for single-server deployments
- Zero configuration - works out of the box
- File-based persistence (
agentfield.db) - Ideal for development, testing, and single-agent deployments
Production (PostgreSQL)
- Distributed database for multi-server deployments
- JSONB columns for flexible data storage
- Automatic indexing on scope, scope_id, and key
- Supports horizontal scaling across multiple Agentfield servers
Storage backend is configured in Agentfield server settings. See Deployment Guide for production database setup and configuration.
Persistence & Durability
- Immediate Persistence: All memory operations are immediately written to disk
- Crash Recovery: Survives agent restarts, server failures, and deployments
- Data Integrity: ACID guarantees (PostgreSQL) or file-level consistency (BoltDB)
Scalability Characteristics
- Horizontal Scaling: Multiple Agentfield server instances share the same PostgreSQL database
- Read Performance: Sub-10ms latency for typical get operations
- Write Performance: Sub-20ms latency for set operations with immediate persistence
- Concurrent Access: PostgreSQL handles concurrent reads/writes across distributed agents
- Memory Limits: No hard limits - constrained only by PostgreSQL storage capacity
Production Considerations
# Memory operations are async and non-blocking
await app.memory.set("large_dataset", data) # ~20ms
# Hierarchical lookup adds minimal overhead
value = await app.memory.get("key") # ~10ms (single scope)
value = await app.memory.get("key") # ~15ms (searches 2-3 scopes)
# Event subscriptions use WebSocket connections
# Each agent maintains one WebSocket per event client
# Minimal overhead: ~1-2ms per event deliveryFor production deployment, database configuration, and scaling strategies, see Deployment Guide.
Best Practices
Key Naming Conventions
Use hierarchical, descriptive key names for better organization and pattern matching.
# ✅ Good - hierarchical and descriptive
await app.memory.set("customer_123.profile", profile_data)
await app.memory.set("customer_123.preferences", prefs)
await app.memory.set("order_456.status", "shipped")
await app.memory.set("order_456.payment", payment_info)
# ❌ Bad - flat and ambiguous
await app.memory.set("c123p", profile_data)
await app.memory.set("data", prefs)
await app.memory.set("status", "shipped")Scope Selection Guidelines
Choose the appropriate scope based on data lifetime and sharing requirements.
# Workflow - task coordination (short-lived, multi-agent)
await app.memory.set("current_step", "processing")
# Session - user data (medium-lived, user-specific)
session_mem = app.memory.session(user_id)
await session_mem.set("preferences", user_prefs)
# Actor - agent state (long-lived, agent-private)
actor_mem = app.memory.actor(app.node_id)
await actor_mem.set("internal_cache", cache_data)
# Global - system config (permanent, system-wide)
await app.memory.global_scope.set("api_version", "2.1.0")Error Handling
Always handle potential memory operation failures gracefully.
@app.reasoner
async def safe_memory_operations(key: str, data: dict):
try:
await app.memory.set(key, data)
return {"status": "success"}
except Exception as e:
# Log error and use fallback
print(f"Memory operation failed: {e}")
await store_locally(key, data)
return {"status": "fallback", "error": str(e)}
@app.reasoner
async def robust_memory_get(key: str):
try:
value = await app.memory.get(key)
if value is not None:
return {"value": value, "source": "memory"}
except Exception as e:
print(f"Memory get failed: {e}")
# Fallback to default
default_value = await get_default_value(key)
return {"value": default_value, "source": "default"}Memory Lifecycle Management
Clean up temporary data to prevent unbounded growth.
@app.reasoner
async def cleanup_expired_data(workflow_id: str):
"""Clean up expired temporary data."""
# Get all temporary keys in workflow using scoped client
workflow_mem = app.memory.workflow(workflow_id)
workflow_keys = await workflow_mem.list_keys()
temp_keys = [k for k in workflow_keys if k.startswith("temp_")]
for key in temp_keys:
data = await workflow_mem.get(key)
if data and "expires_at" in data:
if datetime.fromisoformat(data["expires_at"]) < datetime.now():
await workflow_mem.delete(key)
return {"cleaned_keys": len(temp_keys)}
@app.reasoner
async def set_with_expiration(key: str, data: dict, ttl_minutes: int = 60):
"""Set data with automatic expiration tracking."""
expires_at = datetime.now() + timedelta(minutes=ttl_minutes)
temp_data = {
"data": data,
"expires_at": expires_at.isoformat(),
"created_by": app.node_id
}
await app.memory.set(f"temp_{key}", temp_data)
return {"expires_at": expires_at.isoformat()}Performance Optimization
Batch related operations and use appropriate scoping.
# ✅ Good - batch related data
customer_data = {
"profile": await app.memory.get("customer_profile"),
"preferences": await app.memory.get("customer_preferences"),
"history": await app.memory.get("customer_history")
}
# ✅ Good - use specific scope when known
session_mem = app.memory.session(session_id)
prefs = await session_mem.get("preferences") # Faster - single scope
# ❌ Avoid - unnecessary hierarchical lookups
prefs = await app.memory.get("preferences") # Slower - searches all scopesRelated
- Memory Events - Event system deep dive with pattern matching and history
- @app.reasoner - Use memory within reasoners for state management
- app.call() - Cross-agent calls with automatic memory context
- Execution Context - Understanding automatic scope resolution
- Deployment - Production database configuration and scaling