Skip to content
AgentField

Trigger agents on memory changes

Subscribe a reasoner to a memory key pattern. When any agent writes to a matching key, the reasoner fires.

Reactive multi-agent pipelines without an orchestrator. Stage 1 writes a result. A handler in stage 2 fires automatically. Stage 3 fires on stage 2's write. The chain composes itself.

from agentfield import Agent

app = Agent(node_id="pipeline-coordinator")

# Stage 1: classifier writes the document type
@app.reasoner()
async def classify(document: str, doc_id: str) -> dict:
    classification = await app.ai(system="Classify this document.", user=document)
    await app.memory.global_scope.set(
        f"pipeline.classified.{doc_id}",
        {"type": str(classification), "document": document},
    )
    return {"doc_id": doc_id}

# Stage 2: fires automatically when ANY classification lands
@app.on_change("pipeline.classified.*")
async def on_classified(event):
    doc_id = event.key.split(".")[-1]
    analysis = await app.call(
        f"{event.data['type']}-analyzer.analyze",
        document=event.data["document"],
    )
    await app.memory.global_scope.set(
        f"pipeline.analyzed.{doc_id}",
        analysis,
    )

# Stage 3: fires when the analyzer finishes
@app.on_change("pipeline.analyzed.*")
async def on_analyzed(event):
    doc_id = event.key.split(".")[-1]
    await app.call("reporter.generate", doc_id=doc_id, analysis=event.data)

app.run()

You can also scope subscriptions narrowly:

# Only fire on writes inside a specific session
@app.memory.session("session-123").on_change("conversation.*")
async def on_conversation(event):
    print(f"Session conversation updated: {event.key}")

What this gives you

  • One agent's write becomes another agent's trigger — no queue setup, no polling.
  • Pattern matching uses glob-style wildcards across dot-separated keys.
  • Subscriptions can be scoped to global, session, actor, or workflow memory.

Next