Async Execution

Queue long-running AI tasks with webhook callbacks and status polling

Async Execution

Queue long-running AI tasks with webhook callbacks and status polling

Queue long-running AI tasks and receive immediate confirmation. Agentfield returns an execution_id instantly while processing continues in the background. Get notified via webhook when complete, or poll for status updates.

Why Async Matters for Autonomous Software

AI reasoning, multi-step workflows, and LLM calls create unpredictable latency—from seconds to minutes, or even hours to days for complex nested workflows. Synchronous execution blocks your application during this time. Async execution lets you queue the work, return immediately to users, and handle completion asynchronously. This pattern is essential for realistic autonomous software where AI agents coordinate complex tasks.

Architecture: How It Works Under the Hood

Agentfield's control plane handles async execution natively, supporting arbitrarily long-running executions without timeout limits. This is critical for nested agent workflows where Agent A calls Agent B, which calls Agent C—all running asynchronously.

The Flow:

  1. Client → Control Plane: Client POSTs to /api/v1/execute/async/{target}. Control plane creates execution record (status: running), forwards request to agent with headers (X-Run-ID, X-Execution-ID), and returns 202 Accepted immediately.

  2. Control Plane → Agent: Agent receives request with X-Execution-ID header, returns 202 Accepted immediately, and starts reasoner in background. Execution record stays in running state.

  3. Agent Processing: Reasoner runs as long as needed (hours/days). Nested app.call() operations work seamlessly. All state tracked in control plane.

  4. Agent → Control Plane: When finished, agent POSTs to POST /api/v1/executions/{execution_id}/status with status (succeeded/failed), result payload, duration, and optional progress updates.

  5. Control Plane Callback: Status handler updates execution record, writes result to storage, emits SSE events, updates workflow state, and triggers registered webhooks.

  6. Clients Poll/SSE/Webhooks: Original caller polls /api/v1/executions/{id}, listens to SSE, or receives webhook. No "context deadline exceeded" even for extremely long runs.

SDK Coverage: Python and Go SDKs implement this automatically—no configuration needed. Agents inherit async semantics as soon as they receive control plane headers.

HTTP API

Queue Execution

Submit a long-running task and receive an execution ID immediately:

curl -X POST http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis \
  -H "Content-Type: application/json" \
  -d '{
    "input": {
      "topic": "autonomous software trends",
      "depth": "comprehensive"
    },
    "webhook": {
      "url": "https://app.example.com/webhooks/agentfield",
      "secret": "your-webhook-secret"
    }
  }'

Response (202 Accepted):

{
  "execution_id": "exec_abc123",
  "workflow_id": "wf_xyz789",
  "run_id": "run_def456",
  "status": "queued",
  "target": "research-agent.deep_analysis",
  "type": "reasoner",
  "created_at": "2024-01-15T10:30:45Z",
  "webhook_registered": true,
  "webhook_error": null
}

The execution processes in the background while your application remains responsive. Use the execution_id to track progress or wait for webhook notification. If webhook validation fails (for example, invalid URL scheme), Agentfield still returns 202 Accepted but sets webhook_registered to false and includes a message in webhook_error so you can surface the issue to users.

Check Status

Poll for execution status when webhooks aren't available:

curl http://localhost:8080/api/v1/executions/exec_abc123

Response (200 OK):

{
  "execution_id": "exec_abc123",
  "run_id": "run_def456",
  "status": "running",
  "started_at": "2024-01-15T10:30:50Z",
  "completed_at": null,
  "duration_ms": null,
  "result": null,
  "error": null,
  "webhook_registered": true
}

Status Values:

  • queued - Accepted and waiting to start
  • pending - Scheduled but not yet dispatched (legacy name in older responses)
  • running - Currently executing
  • succeeded - Completed successfully
  • failed - Execution failed (inspect the error field)
  • cancelled - Execution was cancelled
  • timeout - Execution exceeded time limit

Batch Status Check

Monitor multiple concurrent AI agents efficiently:

curl -X POST http://localhost:8080/api/v1/executions/batch-status \
  -H "Content-Type: application/json" \
  -d '{
    "execution_ids": ["exec_abc123", "exec_def456", "exec_ghi789"]
  }'

Response (200 OK):

{
  "exec_abc123": {
    "execution_id": "exec_abc123",
    "run_id": "run_def456",
    "status": "succeeded",
    "result": {
      "summary": "Comprehensive analysis of 23 sources...",
      "sources": 23
    },
    "error": null,
    "started_at": "2024-01-15T10:30:45Z",
    "completed_at": "2024-01-15T10:33:15Z",
    "duration_ms": 150000,
    "webhook_registered": true
  },
  "exec_def456": {
    "execution_id": "exec_def456",
    "run_id": "run_def456",
    "status": "running",
    "result": null,
    "error": null,
    "started_at": "2024-01-15T10:32:10Z",
    "completed_at": null,
    "duration_ms": null,
    "webhook_registered": false
  }
}

Webhook Callbacks

When execution completes, Agentfield POSTs to your webhook URL with the result:

{
  "event": "execution.completed",
  "execution_id": "exec_abc123",
  "workflow_id": "wf_xyz789",
  "status": "succeeded",
  "target": "research-agent.deep_analysis",
  "type": "reasoner",
  "duration_ms": 127450,
  "result": {
    "summary": "Comprehensive analysis of 23 sources...",
    "sources": 23
  },
  "timestamp": "2024-01-15T10:33:15Z"
}

Webhook Configuration

Configure webhooks when submitting async executions:

{
  "input": {
    "topic": "market analysis"
  },
  "webhook": {
    "url": "https://app.example.com/webhooks/agentfield",
    "secret": "your-webhook-secret",
    "headers": {
      "X-Custom-ID": "research-123",
      "X-User-ID": "user_456"
    }
  }
}

Webhook Security

Agentfield signs webhook payloads with HMAC-SHA256 whenever you supply a webhook secret. Verify the signature to ensure authenticity:

import hmac
import hashlib

def verify_webhook(request_body: bytes, signature: str, secret: str) -> bool:
    """Verify Agentfield webhook signature"""
    if not signature.startswith("sha256="):
        return False

    expected = hmac.new(
        secret.encode(),
        request_body,
        hashlib.sha256
    ).hexdigest()

    # Signature format: "sha256=<hex_digest>"
    received = signature.replace("sha256=", "")

    return hmac.compare_digest(expected, received)

# In your webhook handler
@app.post("/webhooks/agentfield")
async def handle_agentfield_webhook(request: Request):
    body = await request.body()
    signature = request.headers.get("X-Agentfield-Signature", "")

    if not verify_webhook(body, signature, "your-webhook-secret"):
        raise HTTPException(status_code=401, detail="Invalid signature")

    payload = await request.json()

    # Process completion
    if payload["event"] == "execution.completed":
        execution_id = payload["execution_id"]
        result = payload["result"]
        # Update your application state
        await notify_user(execution_id, result)

    return {"status": "received"}

Webhook Headers:

  • X-Agentfield-Signature: sha256=<hex_signature> (present only when a webhook secret is provided)
  • Custom headers from webhook.headers configuration

Rust Example

Using the reqwest crate for async HTTP requests:

use reqwest;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();

    // Submit async execution
    let response = client
        .post("http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis")
        .header("Content-Type", "application/json")
        .header("X-Session-ID", "session_123")
        .json(&json!({
            "input": {
                "topic": "autonomous software trends",
                "depth": "comprehensive"
            },
            "webhook": {
                "url": "https://app.example.com/webhooks/agentfield",
                "secret": "your-webhook-secret"
            }
        }))
        .send()
        .await?;

    let result: serde_json::Value = response.json().await?;
    let execution_id = result["execution_id"].as_str().unwrap();

    println!("Queued: {}", execution_id);

    // Poll for status
    loop {
        let status_response = client
            .get(format!("http://localhost:8080/api/v1/executions/{}", execution_id))
            .send()
            .await?;

        let status: serde_json::Value = status_response.json().await?;

        match status["status"].as_str().unwrap() {
            "succeeded" => {
                println!("Result: {:?}", status["result"]);
                break;
            }
            "failed" | "cancelled" | "timeout" => {
                eprintln!("Error: {:?}", status["error"]);
                break;
            }
            _ => {
                tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
            }
        }
    }

    Ok(())
}

Autonomous Software Patterns

Multi-Agent Coordination

Coordinate multiple AI agents asynchronously for complex workflows using REST:

# Agent A: Sentiment analysis (fast, 2-3 seconds)
curl -X POST http://localhost:8080/api/v1/execute/async/sentiment-agent.analyze \
  -H "Content-Type: application/json" \
  -H "X-Workflow-ID: wf_customer_support_001" \
  -d '{
    "input": {
      "messages": ["This is frustrating!", "Third time calling"]
    }
  }' | jq -r '.execution_id' > sentiment_exec.txt

# Agent B: Deep research (slow, 2-5 minutes)
curl -X POST http://localhost:8080/api/v1/execute/async/research-agent.comprehensive_analysis \
  -H "Content-Type: application/json" \
  -H "X-Workflow-ID: wf_customer_support_001" \
  -d '{
    "input": {
      "customer_id": "cust_123",
      "context": "escalation"
    }
  }' | jq -r '.execution_id' > research_exec.txt

# Agent C: Report generation (medium, 30-60 seconds)
curl -X POST http://localhost:8080/api/v1/execute/async/report-agent.generate \
  -H "Content-Type: application/json" \
  -H "X-Workflow-ID: wf_customer_support_001" \
  -d '{
    "input": {
      "sentiment_id": "'$(cat sentiment_exec.txt)'",
      "research_id": "'$(cat research_exec.txt)'"
    }
  }'

# All agents work in parallel - use webhooks or poll for completion

Webhook-First Architecture

Design autonomous systems around webhook notifications:

# Submit long-running AI task with webhook
curl -X POST http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis \
  -H "Content-Type: application/json" \
  -H "X-Session-ID: session_user_456" \
  -H "X-Actor-ID: user_456" \
  -d '{
    "input": {
      "topic": "competitive landscape",
      "depth": "comprehensive"
    },
    "webhook": {
      "url": "https://app.example.com/webhooks/agentfield",
      "secret": "your-webhook-secret",
      "headers": {
        "X-User-ID": "user_456"
      }
    }
  }'

# Response (202 Accepted):
# {
#   "execution_id": "exec_abc123",
#   "run_id": "run_xyz789",
#   "workflow_id": "run_xyz789",
#   "status": "queued",
#   "webhook_registered": true
# }

# Return immediately to user - webhook will notify when complete

Webhook Handler (Python example):

from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib

app = FastAPI()

@app.post("/webhooks/agentfield")
async def handle_completion(request: Request):
    body = await request.body()
    signature = request.headers.get("X-Agentfield-Signature", "")

    # Verify signature
    expected = hmac.new(
        b"your-webhook-secret",
        body,
        hashlib.sha256
    ).hexdigest()

    if not signature.startswith("sha256="):
        raise HTTPException(status_code=401)

    if not hmac.compare_digest(f"sha256={expected}", signature):
        raise HTTPException(status_code=401)

    payload = await request.json()

    if payload["event"] == "execution.completed":
        # Update database
        await db.update_analysis(
            execution_id=payload["execution_id"],
            result=payload["result"],
            status="succeeded"
        )

        # Notify user (look up the recipient using your application data)
        await notify_user_for_run(
            run_id=payload["workflow_id"],
            execution_id=payload["execution_id"],
            result_url=f"/results/{payload['execution_id']}"
        )

    return {"status": "received"}

Handling Backpressure

Agentfield returns 429 Too Many Requests when the execution queue is full. Handle with retry logic:

#!/bin/bash

# Function to submit with retry
submit_with_retry() {
    local target=$1
    local input=$2
    local max_retries=3
    local attempt=0

    while [ $attempt -lt $max_retries ]; do
        response=$(curl -s -w "\n%{http_code}" -X POST \
            "http://localhost:8080/api/v1/execute/async/${target}" \
            -H "Content-Type: application/json" \
            -d "${input}")

        http_code=$(echo "$response" | tail -n1)
        body=$(echo "$response" | head -n-1)

        if [ "$http_code" = "202" ]; then
            echo "$body" | jq -r '.execution_id'
            return 0
        elif [ "$http_code" = "429" ]; then
            wait_time=$((2 ** attempt))
            echo "Queue full, retrying in ${wait_time}s..." >&2
            sleep $wait_time
            attempt=$((attempt + 1))
        else
            echo "Error: HTTP $http_code" >&2
            return 1
        fi
    done

    echo "Failed after $max_retries attempts" >&2
    return 1
}

# Usage
execution_id=$(submit_with_retry \
    "research-agent.deep_analysis" \
    '{"input": {"topic": "market analysis"}}')

echo "Queued: $execution_id"

Rust Example with Retry:

use reqwest;
use serde_json::json;
use tokio::time::{sleep, Duration};

async fn submit_with_retry(
    client: &reqwest::Client,
    target: &str,
    input: serde_json::Value,
    max_retries: u32,
) -> Result<String, Box<dyn std::error::Error>> {
    for attempt in 0..max_retries {
        let response = client
            .post(format!("http://localhost:8080/api/v1/execute/async/{}", target))
            .json(&json!({"input": input}))
            .send()
            .await?;

        match response.status().as_u16() {
            202 => {
                let result: serde_json::Value = response.json().await?;
                return Ok(result["execution_id"].as_str().unwrap().to_string());
            }
            429 if attempt < max_retries - 1 => {
                let wait_time = 2_u64.pow(attempt);
                eprintln!("Queue full, retrying in {}s...", wait_time);
                sleep(Duration::from_secs(wait_time)).await;
            }
            code => {
                return Err(format!("HTTP error: {}", code).into());
            }
        }
    }

    Err("Failed after max retries".into())
}

JavaScript/TypeScript

Use the HTTP API directly from JavaScript applications:

// Submit async execution
const response = await fetch(
  "http://localhost:8080/api/v1/execute/async/research-agent.deep_analysis",
  {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      "X-Session-ID": "session_123",
    },
    body: JSON.stringify({
      input: {
        topic: "autonomous software trends",
        depth: "comprehensive",
      },
      webhook: {
        url: "https://app.example.com/webhooks/agentfield",
        secret: "your-webhook-secret",
      },
    }),
  }
);

const { execution_id, webhook_registered, webhook_error } = await response.json();
console.log("Queued:", execution_id);
console.log("Webhook registered:", webhook_registered);
if (!webhook_registered && webhook_error) {
  console.warn("Webhook registration error:", webhook_error);
}

// Poll for status
async function pollStatus(executionId) {
  const response = await fetch(
    `http://localhost:8080/api/v1/executions/${executionId}`
  );
  const status = await response.json();

  if (status.status === "succeeded") {
    console.log("Result:", status.result);
    return status.result;
  }

  if (["failed", "cancelled", "timeout"].includes(status.status)) {
    throw new Error(status.error ?? "Execution did not complete");
  }

  // Poll again after delay
  await new Promise((resolve) => setTimeout(resolve, 5000));
  return pollStatus(executionId);
}

// Wait for completion
const result = await pollStatus(execution_id);