app.note()
Real-time execution progress reporting for autonomous systems
app.note()
Real-time execution progress reporting for autonomous systems
Real-time progress reporting from autonomous agents to external systems. Sends structured, markdown-formatted execution updates that frontends, microservices, and monitoring systems consume via Server-Sent Events.
Fire-and-Forget: Notes are sent asynchronously in the background and never block execution. Failures are silently handled to prevent workflow interruption.
Basic Example
from agentfield import Agent
app = Agent(node_id="autonomous_underwriter")
@app.reasoner()
async def autonomous_loan_decision(application: dict) -> dict:
"""AI makes loan decisions with full transparency - no black box.
Replaces: Traditional underwriting with manual review,
hardcoded credit scoring rules, and opaque decision processes.
"""
app.note(
"## AI Underwriting Started\n\nAnalyzing loan application with ML models and regulatory compliance",
tags=["ai-decision", "underwriting"]
)
# AI analyzes creditworthiness - replaces complex rule engine
credit_analysis = await app.ai(
system="You are a loan underwriter. Analyze creditworthiness considering income, debt, credit history, and risk factors.",
user=f"Application: {application}",
schema=CreditAnalysis
)
# Transparent AI reasoning - auditable and explainable
app.note(
f"""## AI Credit Analysis
**Credit Score Impact**: {credit_analysis.score_impact}
**Debt-to-Income Ratio**: {credit_analysis.dti_ratio}
**Risk Factors**: {', '.join(credit_analysis.risk_factors)}
**AI Recommendation**: {credit_analysis.decision}
**Confidence**: {credit_analysis.confidence}%
**Regulatory Compliance**: {credit_analysis.compliance_notes}
**Fair Lending Check**: ✓ Passed
""",
tags=["ai-reasoning", "credit-analysis"]
)
if credit_analysis.decision == "decline":
app.note(
f"## Application Declined\n\n**Reason**: {credit_analysis.decline_reason}\n**Adverse Action Notice**: Sent to applicant",
tags=["decision", "declined"]
)
return {"decision": "declined", "reason": credit_analysis.decline_reason}
# AI approved - execute loan processing
app.note("## Loan Approved\n\nProcessing loan documents and funding", tags=["decision", "approved"])
app.note(
f"""## Loan Funded
**Loan Amount**: ${credit_analysis.approved_amount}
**Interest Rate**: {credit_analysis.interest_rate}%
**Term**: {credit_analysis.term_months} months
**AI Decision Audit Trail**: Complete
**Regulatory Documentation**: Generated
""",
tags=["success", "funded"]
)
return {
"decision": "approved",
"amount": credit_analysis.approved_amount,
"rate": credit_analysis.interest_rate
}Parameters
Prop
Type
Common Patterns
Autonomous Medical Diagnosis
AI diagnoses medical conditions with full transparency for physician review.
@app.reasoner()
async def autonomous_medical_diagnosis(
patient_data: dict,
symptoms: list,
medical_history: dict
) -> dict:
"""AI provides diagnostic recommendations with transparent reasoning.
Replaces: Manual symptom checkers, rigid diagnostic flowcharts,
and time-consuming preliminary assessments.
"""
app.note(
f"## AI Diagnostic Analysis Started\n\n**Patient ID**: {patient_data['id']}\n**Chief Complaints**: {', '.join(symptoms)}\n**Analysis Mode**: Differential Diagnosis",
tags=["ai-diagnosis", "medical", "start"]
)
# AI analyzes symptoms with medical knowledge
diagnostic_analysis = await app.ai(
system="You are a medical diagnostic AI. Analyze symptoms, medical history, and provide differential diagnosis with confidence levels.",
user=f"Symptoms: {symptoms}\nMedical History: {medical_history}\nVitals: {patient_data.get('vitals')}",
schema=DiagnosticAnalysis
)
app.note(
f"""## AI Differential Diagnosis
**Primary Diagnosis**: {diagnostic_analysis.primary_diagnosis}
**Confidence**: {diagnostic_analysis.confidence}%
**Differential Diagnoses**:
{chr(10).join(f'- {d.condition} ({d.probability}%)' for d in diagnostic_analysis.differentials)}
**Key Findings**:
{chr(10).join(f'- {finding}' for finding in diagnostic_analysis.key_findings)}
**Red Flags**: {', '.join(diagnostic_analysis.red_flags) if diagnostic_analysis.red_flags else 'None identified'}
""",
tags=["ai-reasoning", "diagnosis"]
)
# AI recommends tests and treatment
app.note("## Generating Recommendations\n\nAI determining optimal diagnostic tests and treatment plan", tags=["recommendations"])
recommendations = await app.ai(
system="Based on diagnostic analysis, recommend appropriate tests, imaging, and initial treatment approach.",
user=f"Diagnosis: {diagnostic_analysis}",
schema=MedicalRecommendations
)
app.note(
f"""## AI Recommendations
**Recommended Tests**:
{chr(10).join(f'- {test.name}: {test.rationale}' for test in recommendations.tests)}
**Imaging**:
{chr(10).join(f'- {img}' for img in recommendations.imaging)}
**Initial Treatment Plan**:
{recommendations.treatment_plan}
**Urgency Level**: {recommendations.urgency}
**Specialist Referral**: {recommendations.specialist_referral or 'Not required'}
**Physician Review Required**: Yes
**AI Confidence Threshold for Auto-Action**: Not met - human oversight required
""",
tags=["recommendations", "treatment-plan"]
)
return {
"diagnosis": diagnostic_analysis,
"recommendations": recommendations,
"requires_physician_review": True,
"ai_confidence": diagnostic_analysis.confidence
}Cross-Agent Coordination
Communicate reasoning between autonomous agents in distributed systems.
@app.reasoner()
async def coordinate_supply_chain(order: dict) -> dict:
"""Coordinate multiple autonomous agents for order fulfillment."""
app.note(
"## Supply Chain Coordination\n\nInitiating multi-agent workflow",
tags=["coordination", "start"]
)
# Call inventory agent
app.note("## Checking Inventory\n\nQuerying inventory agent", tags=["inventory"])
inventory = await app.call(
"inventory_agent.check_availability",
items=order["items"]
)
app.note(
f"## Inventory Response\n\n**Available**: {inventory['available']}\n**Warehouse**: {inventory['warehouse_id']}",
tags=["inventory", "response"]
)
# Call logistics agent
app.note(
"## Scheduling Delivery\n\nCoordinating with logistics agent",
tags=["logistics"]
)
delivery = await app.call(
"logistics_agent.schedule_delivery",
warehouse_id=inventory["warehouse_id"],
destination=order["shipping_address"]
)
app.note(
f"## Delivery Scheduled\n\n**ETA**: {delivery['eta']}\n**Carrier**: {delivery['carrier']}\n**Tracking**: {delivery['tracking_number']}",
tags=["logistics", "scheduled"]
)
return {
"status": "coordinated",
"inventory": inventory,
"delivery": delivery
}Human-in-the-Loop Approval Workflows
Provide real-time transparency for workflows requiring human oversight.
@app.reasoner()
async def autonomous_contract_review(
contract_text: str,
contract_id: str
) -> dict:
"""Review contract with human approval for high-risk clauses."""
app.note(
f"## Contract Review Started\n\n**Contract ID**: {contract_id}\n**Length**: {len(contract_text)} characters",
tags=["review", "start"]
)
# AI analysis
app.note("## AI Analysis\n\nAnalyzing contract clauses and risks", tags=["analysis"])
analysis = await app.ai(
system="You are a contract analyst.",
user=f"Analyze contract: {contract_text}",
schema=ContractAnalysis
)
app.note(
f"## Analysis Complete\n\n**Risk Level**: {analysis.risk_level}\n**Key Clauses**: {len(analysis.clauses)}\n**Concerns**: {len(analysis.concerns)}",
tags=["analysis", "complete"]
)
# High-risk requires human approval
if analysis.risk_level == "high":
app.note(
f"## Human Approval Required\n\n**Reason**: High-risk clauses detected\n\n**Concerns**:\n{chr(10).join(f'- {c}' for c in analysis.concerns)}\n\n**Status**: Awaiting human review",
tags=["approval", "waiting", "high-risk"]
)
# Frontend displays this note and prompts human reviewer
# Agent waits for approval via webhook or polling
approval = await wait_for_human_approval(contract_id)
app.note(
f"## Human Decision\n\n**Approved**: {approval.approved}\n**Reviewer**: {approval.reviewer}\n**Comments**: {approval.comments}",
tags=["approval", "decision"]
)
return {
"status": "approved" if approval.approved else "rejected",
"analysis": analysis,
"approval": approval
}
# Auto-approve low-risk
app.note(
"## Auto-Approved\n\nLow-risk contract, no human review needed",
tags=["approval", "auto"]
)
return {"status": "approved", "analysis": analysis}Compliance & Audit Trails
Generate structured reasoning documentation for regulatory compliance.
@app.reasoner()
async def compliance_check(
transaction: dict,
customer_id: str
) -> dict:
"""Perform compliance checks with full audit trail."""
app.note(
f"## Compliance Check Initiated\n\n**Transaction ID**: {transaction['id']}\n**Amount**: ${transaction['amount']}\n**Customer**: {customer_id}\n**Timestamp**: {datetime.now().isoformat()}",
tags=["compliance", "audit", "start"]
)
# AML screening
app.note("## AML Screening\n\nChecking against sanctions lists", tags=["aml"])
aml_result = await screen_aml(customer_id, transaction)
app.note(
f"## AML Results\n\n**Status**: {aml_result.status}\n**Matches**: {aml_result.matches}\n**Risk Score**: {aml_result.risk_score}\n\n**Details**: {aml_result.details}",
tags=["aml", "results", "audit"]
)
# Fraud detection
app.note("## Fraud Detection\n\nAnalyzing transaction patterns", tags=["fraud"])
fraud_result = await detect_fraud(transaction, customer_id)
app.note(
f"## Fraud Analysis\n\n**Risk Level**: {fraud_result.risk}\n**Indicators**: {fraud_result.indicators}\n**Model Confidence**: {fraud_result.confidence}\n\n**Reasoning**: {fraud_result.explanation}",
tags=["fraud", "results", "audit"]
)
# Final decision
decision = "approved" if aml_result.status == "clear" and fraud_result.risk == "low" else "flagged"
app.note(
f"## Compliance Decision\n\n**Decision**: {decision.upper()}\n**AML**: {aml_result.status}\n**Fraud Risk**: {fraud_result.risk}\n\n**Regulatory Basis**: FINRA Rule 3310, BSA/AML Requirements\n**Reviewed By**: Autonomous Compliance Agent v2.1\n**Review Date**: {datetime.now().isoformat()}",
tags=["compliance", "decision", "audit", "final"]
)
return {
"decision": decision,
"aml": aml_result,
"fraud": fraud_result,
"audit_trail_complete": True
}Error Context for Autonomous Recovery
Capture rich context when failures occur for intelligent recovery.
@app.reasoner()
async def resilient_data_processing(
data_source: str,
processing_config: dict
) -> dict:
"""Process data with autonomous error recovery."""
app.note(
f"## Data Processing Started\n\n**Source**: {data_source}\n**Config**: {processing_config}",
tags=["processing", "start"]
)
try:
# Attempt processing
app.note("## Fetching Data\n\nConnecting to data source", tags=["fetch"])
data = await fetch_data(data_source)
app.note(
f"## Data Retrieved\n\n**Records**: {len(data)}\n**Size**: {sys.getsizeof(data)} bytes",
tags=["fetch", "success"]
)
# Process
app.note("## Processing Data\n\nApplying transformations", tags=["transform"])
result = await transform_data(data, processing_config)
app.note(
f"## Processing Complete\n\n**Output Records**: {len(result)}",
tags=["transform", "success"]
)
return {"status": "success", "records": len(result)}
except ConnectionError as e:
# Capture error context
app.note(
f"## Connection Error\n\n**Error**: {str(e)}\n**Source**: {data_source}\n**Attempt**: 1/3\n\n**Recovery Strategy**: Retry with exponential backoff",
tags=["error", "connection", "recovery"]
)
# Autonomous recovery
app.note("## Initiating Recovery\n\nRetrying with backoff", tags=["recovery"])
await asyncio.sleep(2)
try:
data = await fetch_data(data_source)
app.note("## Recovery Successful\n\nConnection restored", tags=["recovery", "success"])
result = await transform_data(data, processing_config)
return {"status": "recovered", "records": len(result)}
except Exception as retry_error:
app.note(
f"## Recovery Failed\n\n**Error**: {str(retry_error)}\n**Action**: Escalating to fallback source\n\n**Fallback**: {processing_config.get('fallback_source')}",
tags=["error", "recovery", "escalation"]
)
# Try fallback
fallback_data = await fetch_data(processing_config["fallback_source"])
result = await transform_data(fallback_data, processing_config)
app.note(
f"## Fallback Successful\n\n**Source**: Fallback\n**Records**: {len(result)}",
tags=["recovery", "fallback", "success"]
)
return {"status": "fallback", "records": len(result)}How It Works
When you call app.note(), the SDK:
- Retrieves execution context - Automatically includes execution ID, workflow ID, agent node ID, and session information
- Sends asynchronously - Creates background task/thread that sends to Agentfield server's
/api/ui/v1/executions/noteendpoint - Never blocks - Fire-and-forget pattern ensures notes never interrupt workflow execution
- Handles failures silently - Network errors or server issues don't propagate to your code
- Associates automatically - Notes are linked to the current execution in Agentfield's workflow DAG
External systems consume notes via:
- Server-Sent Events (SSE) - Real-time streaming to frontends
- REST API - Polling for batch processing
- Webhooks - Push notifications to external services
Notes are automatically associated with the current execution context. No manual ID management required - the SDK handles all context propagation.
Production Use Cases
Real-Time Dashboard Updates
Autonomous agents report progress to monitoring dashboards showing live execution status across distributed systems.
# Agent reports progress
app.note("## Model Training\n\nEpoch 45/100 - Loss: 0.023", tags=["training", "progress"])
# Dashboard receives via SSE and updates UI in real-time
# Shows: "Model Training - Epoch 45/100 - Loss: 0.023"Microservice Communication
Agents communicate reasoning to other microservices for coordinated autonomous behavior.
# Agent A reports decision
app.note(
"## Scaling Decision\n\nIncreasing capacity by 50% due to traffic spike",
tags=["scaling", "decision"]
)
# Microservice B consumes note via SSE
# Adjusts load balancer configuration accordinglyCompliance Documentation
Automatically generate audit trails for regulatory compliance without manual logging.
# Every decision is documented
app.note(
f"## Loan Approval\n\n**Decision**: Approved\n**Credit Score**: {score}\n**Debt-to-Income**: {dti}\n**Regulatory Basis**: TILA, ECOA",
tags=["compliance", "loan", "audit"]
)
# Compliance system archives all notes with tags=["compliance", "audit"]Human Oversight Systems
Provide transparency for autonomous systems requiring human supervision.
# Agent reports reasoning for human review
app.note(
"## High-Value Transaction\n\nAmount exceeds threshold - requesting approval",
tags=["approval", "human-review"]
)
# Human operator sees note in real-time dashboard
# Can intervene or approve via UIBest Practices
Use Markdown for Structure
Format notes with headers, lists, and emphasis for readability.
# ✅ Well-structured
app.note(
"""## Analysis Complete
**Findings**:
- Revenue up 23%
- Costs down 12%
- Profit margin: 34%
**Recommendation**: Increase production capacity
""",
tags=["analysis", "recommendation"]
)
# ❌ Unstructured
app.note("analysis done revenue up costs down recommend increase", tags=["analysis"])Tag Consistently
Use consistent tag conventions for filtering and routing.
# ✅ Consistent tagging scheme
app.note("Starting validation", tags=["validation", "start"])
app.note("Validation complete", tags=["validation", "complete"])
app.note("Error in validation", tags=["validation", "error"])
# ❌ Inconsistent tags
app.note("Starting validation", tags=["begin"])
app.note("Validation complete", tags=["done", "finished"])
app.note("Error in validation", tags=["problem", "issue", "error"])Report Key Decision Points
Focus on decisions, not implementation details.
# ✅ Decision-focused
app.note(
"## Routing Decision\n\nSending to premium queue based on customer tier",
tags=["routing", "decision"]
)
# ❌ Implementation details
app.note("Checking if customer.tier == 'premium'", tags=["code"])Include Context for Autonomous Systems
Provide enough context for other systems to understand and act.
# ✅ Rich context
app.note(
f"## Capacity Alert\n\n**Current**: 87%\n**Threshold**: 80%\n**Action**: Scaling up\n**ETA**: 2 minutes",
tags=["capacity", "scaling", "alert"]
)
# ❌ Minimal context
app.note("Scaling", tags=["scaling"])Use Notes for Progress, Not Debugging
Notes are for production progress reporting. Use standard logging for debugging.
# ✅ Production progress
app.note("## Payment Processing\n\nCharging customer", tags=["payment"])
# ❌ Debug information (use logging instead)
app.note(f"DEBUG: variable x = {x}, y = {y}", tags=["debug"])Performance Considerations
Async Delivery:
- Notes are sent in background tasks/threads
- Zero blocking overhead on main execution
- Failed deliveries don't interrupt workflow
Network Overhead:
- Each note is ~1-5KB depending on message length
- Sent via HTTP POST to Agentfield server
- Consider note frequency for high-throughput systems
Best Practices:
- Emit notes at key milestones, not every line
- Use tags for filtering instead of verbose messages
- Batch related updates when possible
Related
- Server-Sent Events - Consume notes in real-time from frontends
- Execution Context - Automatic context propagation for notes
- Workflow DAGs - View notes in workflow execution graphs
- @app.reasoner - Use notes within reasoners for transparency
- Monitoring - Aggregate notes for system-wide observability