Trigger agents on memory changes
Subscribe a reasoner to a memory key pattern. When any agent writes to a matching key, the reasoner fires.
Reactive multi-agent pipelines without an orchestrator. Stage 1 writes a result. A handler in stage 2 fires automatically. Stage 3 fires on stage 2's write. The chain composes itself.
from agentfield import Agent
app = Agent(node_id="pipeline-coordinator")
# Stage 1: classifier writes the document type
@app.reasoner()
async def classify(document: str, doc_id: str) -> dict:
classification = await app.ai(system="Classify this document.", user=document)
await app.memory.global_scope.set(
f"pipeline.classified.{doc_id}",
{"type": str(classification), "document": document},
)
return {"doc_id": doc_id}
# Stage 2: fires automatically when ANY classification lands
@app.on_change("pipeline.classified.*")
async def on_classified(event):
doc_id = event.key.split(".")[-1]
analysis = await app.call(
f"{event.data['type']}-analyzer.analyze",
document=event.data["document"],
)
await app.memory.global_scope.set(
f"pipeline.analyzed.{doc_id}",
analysis,
)
# Stage 3: fires when the analyzer finishes
@app.on_change("pipeline.analyzed.*")
async def on_analyzed(event):
doc_id = event.key.split(".")[-1]
await app.call("reporter.generate", doc_id=doc_id, analysis=event.data)
app.run()You can also scope subscriptions narrowly:
# Only fire on writes inside a specific session
@app.memory.session("session-123").on_change("conversation.*")
async def on_conversation(event):
print(f"Session conversation updated: {event.key}")What this gives you
- One agent's write becomes another agent's trigger — no queue setup, no polling.
- Pattern matching uses glob-style wildcards across dot-separated keys.
- Subscriptions can be scoped to global, session, actor, or workflow memory.
Next