Server-Sent Events
Real-time streaming of workflow and execution updates
Server-Sent Events
Real-time streaming of workflow and execution updates
Real-time HTTP streaming for workflow events, execution updates, memory changes, and workflow notes. SSE provides live progress tracking as AI agents process multi-step workflows.
Workflow Run Events
Stream all events for a workflow run, including historical events and real-time updates.
curl -N "http://localhost:8080/api/v1/workflows/runs/{run_id}/events/stream"SSE Stream Format:
event: workflow_run_event
data: {"event_id":1,"run_id":"run_abc123","sequence":1,"event_type":"execution_started","status":"running","emitted_at":"2024-01-15T10:30:45Z"}
event: workflow_run_event
data: {"event_id":2,"run_id":"run_abc123","sequence":2,"event_type":"execution_completed","status":"succeeded","payload":{"duration_ms":1247},"emitted_at":"2024-01-15T10:30:47Z"}
: keep-alive
event: workflow_run_event
data: {"event_id":3,"run_id":"run_abc123","sequence":3,"event_type":"workflow_completed","status":"succeeded","emitted_at":"2024-01-15T10:30:48Z"}Query Parameters:
Prop
Type
Event Payload Structure:
Prop
Type
Use Case: Multi-Agent Workflow Monitoring
Track complex AI workflows where multiple agents coordinate:
// React component tracking multi-agent workflow
const WorkflowMonitor = ({ runId }) => {
const [events, setEvents] = useState([]);
const [status, setStatus] = useState('connecting');
useEffect(() => {
const eventSource = new EventSource(
`http://localhost:8080/api/v1/workflows/runs/${runId}/events/stream`
);
eventSource.addEventListener('workflow_run_event', (e) => {
const event = JSON.parse(e.data);
setEvents(prev => [...prev, event]);
// Update UI based on event type
if (event.event_type === 'workflow_completed') {
setStatus('completed');
eventSource.close();
} else if (event.event_type === 'execution_started') {
setStatus(`Processing: ${event.payload?.target || 'unknown'}`);
}
});
eventSource.onerror = () => {
setStatus('disconnected');
eventSource.close();
};
return () => eventSource.close();
}, [runId]);
return (
<div>
<h3>Workflow Status: {status}</h3>
<ul>
{events.map(e => (
<li key={e.sequence}>
[{e.sequence}] {e.event_type} - {e.status}
</li>
))}
</ul>
</div>
);
};Workflow Execution Events
Stream events for a specific execution within a workflow.
curl -N "http://localhost:8080/api/v1/workflows/executions/{execution_id}/events/stream"SSE Stream Format:
event: workflow_execution_event
data: {"event_id":1,"execution_id":"exec_abc123","workflow_id":"wf_xyz789","sequence":1,"event_type":"execution_started","status":"running","emitted_at":"2024-01-15T10:30:45Z"}
event: workflow_execution_event
data: {"event_id":2,"execution_id":"exec_abc123","sequence":2,"event_type":"execution_completed","status":"succeeded","payload":{"duration_ms":1247},"emitted_at":"2024-01-15T10:30:47Z"}Event Payload Structure:
Prop
Type
Use Case: Execution Debugging
Debug autonomous agents by streaming execution details:
import asyncio
import aiohttp
import json
async def debug_agent_execution(execution_id: str):
"""Debug AI agent execution with detailed event streaming"""
url = f"http://localhost:8080/api/v1/workflows/executions/{execution_id}/events/stream"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('event: '):
event_name = line[7:]
elif line.startswith('data: '):
event = json.loads(line[6:])
print(f"[{event['sequence']}] {event['event_type']}")
print(f" Status: {event.get('status')}")
if event.get('payload'):
print(f" Payload: {event['payload']}")
if event['event_type'] == 'execution_completed':
print("✅ Execution completed")
break
elif event['event_type'] == 'execution_failed':
print(f"❌ Execution failed: {event.get('status_reason')}")
break
# Debug a failing autonomous workflow
await debug_agent_execution("exec_abc123")Workflow Notes Events
Stream real-time notes added to workflows via app.note(). Useful for tracking AI agent decisions and progress.
curl -N "http://localhost:8080/api/ui/v1/workflows/{workflow_id}/notes/events"SSE Stream Format:
data: {"type":"connected","workflow_id":"wf_xyz789","message":"Workflow node notes stream connected","timestamp":"2024-01-15T10:30:45Z"}
data: {"type":"heartbeat","timestamp":"2024-01-15T10:31:15Z"}
data: {"type":"workflow_note_added","workflow_id":"wf_xyz789","execution_id":"exec_abc123","note":{"message":"Analyzing customer sentiment","tags":["sentiment","analysis"],"timestamp":"2024-01-15T10:31:20Z"}}Event Types:
connected- Initial connection confirmationheartbeat- Keep-alive message (every 30 seconds)workflow_note_added- New note added to workflow
Use Case: Real-Time AI Decision Tracking
Monitor AI agent reasoning and decisions in real-time:
// Track AI agent notes for observability
const NotesMonitor = ({ workflowId }) => {
const [notes, setNotes] = useState([]);
useEffect(() => {
const eventSource = new EventSource(
`http://localhost:8080/api/ui/v1/workflows/${workflowId}/notes/events`
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
if (event.type === 'workflow_note_added') {
setNotes(prev => [...prev, event.note]);
// Log AI decisions
console.log(`🤖 ${event.note.message}`);
if (event.note.tags) {
console.log(` Tags: ${event.note.tags.join(', ')}`);
}
}
};
return () => eventSource.close();
}, [workflowId]);
return (
<div>
<h3>AI Agent Notes</h3>
{notes.map((note, i) => (
<div key={i}>
<p>{note.message}</p>
<small>{note.timestamp}</small>
</div>
))}
</div>
);
};Memory Change Events
Stream real-time memory updates across distributed agents. Track when agents read/write shared state.
curl -N "http://localhost:8080/api/v1/memory/events/sse?scope=session&scope_id=session_123"Query Parameters:
Prop
Type
SSE Stream Format:
event: message
data: {"id":"evt_123","type":"memory_change","timestamp":"2024-01-15T10:30:45Z","scope":"session","scope_id":"session_123","key":"user_preferences","action":"set","data":{"theme":"dark","language":"en"},"metadata":{"agent_id":"support-agent","workflow_id":"wf_xyz789"}}
event: message
data: {"id":"evt_124","type":"memory_change","timestamp":"2024-01-15T10:30:47Z","scope":"session","scope_id":"session_123","key":"conversation_context","action":"set","data":{"last_topic":"billing","sentiment":"frustrated"},"metadata":{"agent_id":"support-agent","workflow_id":"wf_xyz789"}}Event Payload Structure:
Prop
Type
Use Case: Cross-Agent State Synchronization
Monitor shared state changes across distributed AI agents:
async def monitor_agent_memory(scope: str, scope_id: str):
"""Monitor memory changes for debugging and observability"""
url = f"http://localhost:8080/api/v1/memory/events/sse?scope={scope}&scope_id={scope_id}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async for line in response.content:
line = line.decode('utf-8').strip()
if line.startswith('data: '):
event = json.loads(line[6:])
if event['type'] == 'memory_change':
action = event['action']
key = event['key']
agent = event['metadata'].get('agent_id', 'unknown')
if action == 'set':
print(f"💾 {agent} set {key} = {event['data']}")
elif action == 'delete':
print(f"🗑️ {agent} deleted {key}")
# Monitor session memory for autonomous customer service
await monitor_agent_memory("session", "session_user_123")Execution Events (UI)
Stream all execution events across the system. Useful for building real-time dashboards.
curl -N "http://localhost:8080/api/ui/v1/executions/events"SSE Stream Format:
data: {"type":"connected","message":"Execution events stream connected","timestamp":"2024-01-15T10:30:45Z"}
data: {"type":"heartbeat","timestamp":"2024-01-15T10:31:15Z"}
data: {"type":"execution_started","execution_id":"exec_abc123","workflow_id":"wf_xyz789","agent_node_id":"support-agent","reasoner_id":"analyze_sentiment","timestamp":"2024-01-15T10:31:20Z"}
data: {"type":"execution_completed","execution_id":"exec_abc123","status":"succeeded","duration_ms":1247,"timestamp":"2024-01-15T10:31:21Z"}Use Case: System-Wide Monitoring Dashboard
Build real-time dashboards showing all agent activity:
// Real-time execution monitoring
const ExecutionDashboard = () => {
const [executions, setExecutions] = useState([]);
const [stats, setStats] = useState({ total: 0, running: 0, succeeded: 0, failed: 0 });
useEffect(() => {
const eventSource = new EventSource(
'http://localhost:8080/api/ui/v1/executions/events'
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
if (event.type === 'execution_started') {
setExecutions(prev => [...prev, { ...event, status: 'running' }]);
setStats(prev => ({ ...prev, total: prev.total + 1, running: prev.running + 1 }));
} else if (event.type === 'execution_completed') {
setExecutions(prev =>
prev.map(ex =>
ex.execution_id === event.execution_id
? { ...ex, status: event.status, duration_ms: event.duration_ms }
: ex
)
);
setStats(prev => ({
...prev,
running: prev.running - 1,
[event.status === 'succeeded' ? 'succeeded' : 'failed']:
prev[event.status === 'succeeded' ? 'succeeded' : 'failed'] + 1
}));
}
};
return () => eventSource.close();
}, []);
return (
<div>
<h3>System Stats</h3>
<p>Total: {stats.total} | Running: {stats.running} | Succeeded: {stats.succeeded} | Failed: {stats.failed}</p>
<h3>Recent Executions</h3>
<ul>
{executions.slice(-10).map(ex => (
<li key={ex.execution_id}>
{ex.agent_node_id}.{ex.reasoner_id} - {ex.status}
{ex.duration_ms && ` (${ex.duration_ms}ms)`}
</li>
))}
</ul>
</div>
);
};Resuming from Last Event
Use after_seq to resume streaming from where you left off, avoiding duplicate events:
// Resume SSE connection after disconnect
let lastSequence = 0;
function connectWithResume(runId) {
const url = lastSequence > 0
? `http://localhost:8080/api/v1/workflows/runs/${runId}/events/stream?after_seq=${lastSequence}`
: `http://localhost:8080/api/v1/workflows/runs/${runId}/events/stream`;
const eventSource = new EventSource(url);
eventSource.addEventListener('workflow_run_event', (e) => {
const event = JSON.parse(e.data);
lastSequence = event.sequence; // Track latest sequence
processEvent(event);
});
eventSource.onerror = () => {
eventSource.close();
// Reconnect after delay, resuming from last sequence
setTimeout(() => connectWithResume(runId), 5000);
};
}Keep-Alive Mechanism
Agentfield sends : keep-alive\n\n every 30 seconds to prevent connection timeouts. Clients should handle these gracefully:
async def stream_with_keepalive(run_id: str):
"""Handle SSE with keep-alive messages"""
url = f"http://localhost:8080/api/v1/workflows/runs/{run_id}/events/stream"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async for line in response.content:
line = line.decode('utf-8').strip()
# Ignore keep-alive comments
if line.startswith(':'):
continue
if line.startswith('event: '):
event_type = line[7:]
elif line.startswith('data: '):
event_data = json.loads(line[6:])
print(f"Event: {event_type} - {event_data}")Connection Management
Reconnection Strategy with Exponential Backoff:
class SSEConnection {
constructor(url, onEvent) {
this.url = url;
this.onEvent = onEvent;
this.lastSequence = 0;
this.reconnectDelay = 1000;
this.maxReconnectDelay = 30000;
}
connect() {
const url = this.lastSequence > 0
? `${this.url}?after_seq=${this.lastSequence}`
: this.url;
this.eventSource = new EventSource(url);
this.eventSource.addEventListener('workflow_run_event', (e) => {
const event = JSON.parse(e.data);
this.lastSequence = event.sequence;
this.reconnectDelay = 1000; // Reset delay on success
this.onEvent(event);
});
this.eventSource.onerror = () => {
this.eventSource.close();
console.log(`Reconnecting in ${this.reconnectDelay}ms...`);
setTimeout(() => this.connect(), this.reconnectDelay);
// Exponential backoff
this.reconnectDelay = Math.min(
this.reconnectDelay * 2,
this.maxReconnectDelay
);
};
}
close() {
if (this.eventSource) {
this.eventSource.close();
}
}
}
// Usage
const connection = new SSEConnection(
'http://localhost:8080/api/v1/workflows/runs/run_abc123/events/stream',
(event) => console.log('Event:', event)
);
connection.connect();Related
- Async Execution - Queue long-running AI tasks
- Webhooks - HTTP callbacks for completion
- REST API - Polling-based status checks
- app.note - Add notes to workflows for SSE streaming