AI Codex
AI Agents & OrchestrationFailure Modes

Multi-agent failure handling: timeouts, partial outputs, and recovery patterns

In brief

Agents fail differently than APIs. When a sub-agent times out halfway through a pipeline, you don't just get an error — you get partial state. The patterns that make multi-agent systems actually recover.

8 min read·AI Agent

Contents

Sign in to save

Single-agent failures are straightforward: the call fails, you retry or surface the error. Multi-agent failures are not. When agent A calls agent B which calls agent C, and agent C times out, you have partial state in B, a hanging call in A, and a user waiting for output that will never arrive cleanly.

Most multi-agent implementations handle the happy path well. The failure handling is where they break in production.

The failure modes that actually happen

Timeout mid-pipeline. Agent B is waiting on agent C, which is doing something expensive (a web search, a large document analysis). After 30 seconds, the caller times out. Agent C may still be running. Now you have a partially completed pipeline with no clean way to resume.

Partial output from a sub-agent. Agent C returns, but its output is incomplete — it truncated because of max_tokens, it returned a partial JSON object, or it returned an error message embedded in the output text rather than as a proper error. The orchestrator receives something that looks like a result but is not.

Cascading failures. Agent A orchestrates agents B, C, and D in parallel. D fails. A does not know whether to wait for B and C, discard their results, or try to complete the task with partial data.

Silent degradation. A sub-agent does not fail hard — it returns a low-quality output that passes validation but produces a bad final result. This is the hardest failure mode to catch.

State inconsistency. Agent B writes to a database halfway through its task, then fails. The write committed. The task did not complete. Downstream agents working from that database now have inconsistent state.

The foundational pattern: checkpoints and idempotency

The most important architectural decision for resilient multi-agent systems is making sub-agent operations idempotent — safe to retry without side effects.

import hashlib
import json
from typing import Any, Optional
import anthropic

client = anthropic.Anthropic()

def generate_task_id(task_input: dict) -> str:
    """Deterministic ID from task inputs — same inputs = same ID = safe to retry."""
    serialized = json.dumps(task_input, sort_keys=True)
    return hashlib.sha256(serialized.encode()).hexdigest()[:16]

class AgentTask:
    def __init__(self, task_id: str, task_input: dict):
        self.task_id = task_id
        self.task_input = task_input
        self.result: Optional[str] = None
        self.status: str = "pending"  # pending | running | complete | failed
        self.attempts: int = 0

    def run(self, prompt: str, max_retries: int = 3) -> str:
        for attempt in range(max_retries):
            self.attempts += 1
            self.status = "running"
            try:
                response = client.messages.create(
                    model="claude-sonnet-4-6",
                    max_tokens=2048,
                    messages=[{"role": "user", "content": prompt}]
                )
                self.result = response.content[0].text
                self.status = "complete"
                return self.result
            except anthropic.APITimeoutError:
                if attempt == max_retries - 1:
                    self.status = "failed"
                    raise
                # Exponential backoff before retry
                import time
                time.sleep(2 ** attempt)
            except anthropic.APIStatusError as e:
                if e.status_code == 529:  # Overloaded
                    import time
                    time.sleep(2 ** attempt)
                else:
                    self.status = "failed"
                    raise
        return ""  # unreachable, but satisfies type checker

Timeout handling with fallback strategies

For pipelines where one agent is on the critical path, design explicit fallback strategies before you need them.

import asyncio
from typing import Callable, TypeVar

T = TypeVar('T')

async def with_timeout(
    coro,
    timeout_seconds: float,
    fallback: Callable[[], T],
    task_name: str = "task"
) -> T:
    """Run a coroutine with timeout; call fallback on expiry."""
    try:
        return await asyncio.wait_for(coro, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        print(f"[{task_name}] timed out after {timeout_seconds}s — using fallback")
        return fallback()

# Three fallback strategies:

# 1. Return a partial result with a flag
async def research_agent_with_fallback(query: str) -> dict:
    async def do_research():
        # ... full research agent call
        return {"result": "...", "complete": True}

    def partial_fallback():
        return {"result": f"Research for '{query}' was incomplete due to timeout. Available context only.", "complete": False}

    return await with_timeout(do_research(), timeout_seconds=30, fallback=partial_fallback, task_name="research")

# 2. Use a cheaper/faster model for the fallback
async def analysis_with_model_fallback(content: str) -> str:
    try:
        # Try Sonnet first (better quality)
        response = await asyncio.wait_for(
            asyncio.to_thread(
                client.messages.create,
                model="claude-sonnet-4-6",
                max_tokens=1024,
                messages=[{"role": "user", "content": content}]
            ),
            timeout=20
        )
        return response.content[0].text
    except asyncio.TimeoutError:
        # Fall back to Haiku (faster)
        response = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=512,
            messages=[{"role": "user", "content": f"Briefly: {content}"}]
        )
        return f"[Abbreviated] {response.content[0].text}"

# 3. Skip the agent and continue with available data
async def optional_enrichment(base_result: dict) -> dict:
    async def enrich():
        # enrichment call that might time out
        return {**base_result, "enriched": True, "extra_data": "..."}

    def skip_enrichment():
        return {**base_result, "enriched": False}

    return await with_timeout(enrich(), timeout_seconds=15, fallback=skip_enrichment, task_name="enrichment")

Parsing and validating sub-agent outputs

Partial outputs are harder to catch than failures. A sub-agent that returns truncated JSON looks like a success to the caller.

import json
from dataclasses import dataclass

@dataclass
class AgentOutput:
    raw: str
    parsed: Any
    is_valid: bool
    validation_errors: list[str]

def validate_agent_output(raw_output: str, expected_schema: dict) -> AgentOutput:
    """Validate that output matches expected structure before passing downstream."""
    errors = []

    # Check 1: Is it valid JSON if we expected JSON?
    parsed = None
    if expected_schema.get("type") == "json":
        try:
            parsed = json.loads(raw_output)
        except json.JSONDecodeError as e:
            errors.append(f"JSON parse failed: {e}")
            return AgentOutput(raw=raw_output, parsed=None, is_valid=False, validation_errors=errors)

    # Check 2: Required fields present
    required_fields = expected_schema.get("required", [])
    for field in required_fields:
        if field not in parsed:
            errors.append(f"Missing required field: {field}")

    # Check 3: Output not suspiciously short (truncation signal)
    min_length = expected_schema.get("min_length", 0)
    if len(raw_output) < min_length:
        errors.append(f"Output suspiciously short ({len(raw_output)} chars, expected ≥ {min_length})")

    # Check 4: Stop reason was 'end_turn' not 'max_tokens'
    # (Pass stop_reason from the API response, not just the content)
    stop_reason = expected_schema.get("_stop_reason")
    if stop_reason == "max_tokens":
        errors.append("Output was truncated at max_tokens — likely incomplete")

    return AgentOutput(
        raw=raw_output,
        parsed=parsed,
        is_valid=len(errors) == 0,
        validation_errors=errors
    )

Always check stop_reason in the API response — max_tokens is the signal that output was truncated. Pass it alongside the content, don't discard it.

Parallel agent coordination with partial failure

When multiple agents run in parallel and one fails, you need a policy for the others.

import asyncio
from enum import Enum

class FailurePolicy(Enum):
    FAIL_ALL = "fail_all"       # If any agent fails, fail the whole task
    BEST_EFFORT = "best_effort" # Return whatever succeeded, mark what failed
    REQUIRE_QUORUM = "quorum"   # Require N of M agents to succeed

async def run_parallel_agents(
    tasks: list[dict],
    policy: FailurePolicy = FailurePolicy.BEST_EFFORT,
    quorum: int = None
) -> dict:
    """Run agents in parallel with configurable failure policy."""

    async def run_single(task: dict) -> dict:
        try:
            result = await asyncio.to_thread(
                client.messages.create,
                model="claude-sonnet-4-6",
                max_tokens=1024,
                messages=[{"role": "user", "content": task["prompt"]}]
            )
            return {"id": task["id"], "success": True, "output": result.content[0].text}
        except Exception as e:
            return {"id": task["id"], "success": False, "error": str(e)}

    results = await asyncio.gather(*[run_single(t) for t in tasks])

    successes = [r for r in results if r["success"]]
    failures = [r for r in results if not r["success"]]

    if policy == FailurePolicy.FAIL_ALL and failures:
        raise RuntimeError(f"{len(failures)} agents failed: {[f['error'] for f in failures]}")

    if policy == FailurePolicy.REQUIRE_QUORUM:
        required = quorum or (len(tasks) // 2 + 1)
        if len(successes) < required:
            raise RuntimeError(f"Quorum not met: {len(successes)}/{required} agents succeeded")

    return {
        "results": successes,
        "failures": failures,
        "complete": len(failures) == 0
    }

State management: write-ahead logging

For pipelines that write to external systems (databases, APIs), use write-ahead logging to track what has committed so that retries do not cause double-writes.

from datetime import datetime
import uuid

class PipelineState:
    """Tracks committed side effects so retries are safe."""

    def __init__(self, pipeline_id: str):
        self.pipeline_id = pipeline_id
        self.committed_operations: dict[str, Any] = {}
        self.created_at = datetime.utcnow()

    def has_committed(self, operation_key: str) -> bool:
        return operation_key in self.committed_operations

    def mark_committed(self, operation_key: str, result: Any):
        self.committed_operations[operation_key] = {
            "result": result,
            "committed_at": datetime.utcnow().isoformat()
        }

    def get_committed(self, operation_key: str) -> Any:
        return self.committed_operations.get(operation_key, {}).get("result")

# Usage in a pipeline
async def idempotent_pipeline(input_data: dict) -> dict:
    pipeline_id = generate_task_id(input_data)
    state = PipelineState(pipeline_id)

    # Step 1 — only run if not already committed
    step1_key = f"{pipeline_id}:step1"
    if not state.has_committed(step1_key):
        result1 = await run_agent_step(input_data, step="analyze")
        state.mark_committed(step1_key, result1)
    else:
        result1 = state.get_committed(step1_key)

    # Step 2
    step2_key = f"{pipeline_id}:step2"
    if not state.has_committed(step2_key):
        result2 = await run_agent_step(result1, step="synthesize")
        state.mark_committed(step2_key, result2)
    else:
        result2 = state.get_committed(step2_key)

    return result2

For production, persist PipelineState to Redis or Postgres so it survives process restarts.

The silent degradation problem

The hardest failure mode is an agent that succeeds but produces bad output. No error thrown, but the downstream result is wrong.

The only defense is evaluating outputs before trusting them downstream. For critical pipeline steps, add a lightweight quality check:

def is_output_plausibly_correct(output: str, task_context: dict) -> tuple[bool, str]:
    """Quick sanity check before passing output downstream."""

    # Check 1: Contains expected entities
    expected_entities = task_context.get("required_entities", [])
    for entity in expected_entities:
        if entity.lower() not in output.lower():
            return False, f"Expected entity missing: {entity}"

    # Check 2: Not a refusal or error message
    refusal_signals = ["I cannot", "I'm unable to", "I don't have access", "As an AI"]
    for signal in refusal_signals:
        if signal in output:
            return False, f"Output appears to be a refusal: '{signal}'"

    # Check 3: Length within expected range
    min_len = task_context.get("min_output_length", 50)
    if len(output.split()) < min_len:
        return False, f"Output too short: {len(output.split())} words"

    return True, "ok"

This will not catch subtle quality issues, but it catches the obvious failures that otherwise silently corrupt downstream steps.


Related: Multi-agent orchestration basics — the foundational patterns before you get to failure handling. Evaluating multi-agent systems — how to measure whether your pipeline is working.

Further reading

Weekly brief

For people actually using Claude at work.

What practitioners are building, the mistakes worth avoiding, and the workflows that actually stick. No tutorials. No hype.

No spam. Unsubscribe anytime.

What to read next

Picked for where you are now

All articles →