Multi-agent failure handling: timeouts, partial outputs, and recovery patterns
In brief
Agents fail differently than APIs. When a sub-agent times out halfway through a pipeline, you don't just get an error — you get partial state. The patterns that make multi-agent systems actually recover.
Contents
Single-agent failures are straightforward: the call fails, you retry or surface the error. Multi-agent failures are not. When agent A calls agent B which calls agent C, and agent C times out, you have partial state in B, a hanging call in A, and a user waiting for output that will never arrive cleanly.
Most multi-agent implementations handle the happy path well. The failure handling is where they break in production.
The failure modes that actually happen
Timeout mid-pipeline. Agent B is waiting on agent C, which is doing something expensive (a web search, a large document analysis). After 30 seconds, the caller times out. Agent C may still be running. Now you have a partially completed pipeline with no clean way to resume.
Partial output from a sub-agent. Agent C returns, but its output is incomplete — it truncated because of max_tokens, it returned a partial JSON object, or it returned an error message embedded in the output text rather than as a proper error. The orchestrator receives something that looks like a result but is not.
Cascading failures. Agent A orchestrates agents B, C, and D in parallel. D fails. A does not know whether to wait for B and C, discard their results, or try to complete the task with partial data.
Silent degradation. A sub-agent does not fail hard — it returns a low-quality output that passes validation but produces a bad final result. This is the hardest failure mode to catch.
State inconsistency. Agent B writes to a database halfway through its task, then fails. The write committed. The task did not complete. Downstream agents working from that database now have inconsistent state.
The foundational pattern: checkpoints and idempotency
The most important architectural decision for resilient multi-agent systems is making sub-agent operations idempotent — safe to retry without side effects.
import hashlib
import json
from typing import Any, Optional
import anthropic
client = anthropic.Anthropic()
def generate_task_id(task_input: dict) -> str:
"""Deterministic ID from task inputs — same inputs = same ID = safe to retry."""
serialized = json.dumps(task_input, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()[:16]
class AgentTask:
def __init__(self, task_id: str, task_input: dict):
self.task_id = task_id
self.task_input = task_input
self.result: Optional[str] = None
self.status: str = "pending" # pending | running | complete | failed
self.attempts: int = 0
def run(self, prompt: str, max_retries: int = 3) -> str:
for attempt in range(max_retries):
self.attempts += 1
self.status = "running"
try:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
self.result = response.content[0].text
self.status = "complete"
return self.result
except anthropic.APITimeoutError:
if attempt == max_retries - 1:
self.status = "failed"
raise
# Exponential backoff before retry
import time
time.sleep(2 ** attempt)
except anthropic.APIStatusError as e:
if e.status_code == 529: # Overloaded
import time
time.sleep(2 ** attempt)
else:
self.status = "failed"
raise
return "" # unreachable, but satisfies type checker
Timeout handling with fallback strategies
For pipelines where one agent is on the critical path, design explicit fallback strategies before you need them.
import asyncio
from typing import Callable, TypeVar
T = TypeVar('T')
async def with_timeout(
coro,
timeout_seconds: float,
fallback: Callable[[], T],
task_name: str = "task"
) -> T:
"""Run a coroutine with timeout; call fallback on expiry."""
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
print(f"[{task_name}] timed out after {timeout_seconds}s — using fallback")
return fallback()
# Three fallback strategies:
# 1. Return a partial result with a flag
async def research_agent_with_fallback(query: str) -> dict:
async def do_research():
# ... full research agent call
return {"result": "...", "complete": True}
def partial_fallback():
return {"result": f"Research for '{query}' was incomplete due to timeout. Available context only.", "complete": False}
return await with_timeout(do_research(), timeout_seconds=30, fallback=partial_fallback, task_name="research")
# 2. Use a cheaper/faster model for the fallback
async def analysis_with_model_fallback(content: str) -> str:
try:
# Try Sonnet first (better quality)
response = await asyncio.wait_for(
asyncio.to_thread(
client.messages.create,
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": content}]
),
timeout=20
)
return response.content[0].text
except asyncio.TimeoutError:
# Fall back to Haiku (faster)
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{"role": "user", "content": f"Briefly: {content}"}]
)
return f"[Abbreviated] {response.content[0].text}"
# 3. Skip the agent and continue with available data
async def optional_enrichment(base_result: dict) -> dict:
async def enrich():
# enrichment call that might time out
return {**base_result, "enriched": True, "extra_data": "..."}
def skip_enrichment():
return {**base_result, "enriched": False}
return await with_timeout(enrich(), timeout_seconds=15, fallback=skip_enrichment, task_name="enrichment")
Parsing and validating sub-agent outputs
Partial outputs are harder to catch than failures. A sub-agent that returns truncated JSON looks like a success to the caller.
import json
from dataclasses import dataclass
@dataclass
class AgentOutput:
raw: str
parsed: Any
is_valid: bool
validation_errors: list[str]
def validate_agent_output(raw_output: str, expected_schema: dict) -> AgentOutput:
"""Validate that output matches expected structure before passing downstream."""
errors = []
# Check 1: Is it valid JSON if we expected JSON?
parsed = None
if expected_schema.get("type") == "json":
try:
parsed = json.loads(raw_output)
except json.JSONDecodeError as e:
errors.append(f"JSON parse failed: {e}")
return AgentOutput(raw=raw_output, parsed=None, is_valid=False, validation_errors=errors)
# Check 2: Required fields present
required_fields = expected_schema.get("required", [])
for field in required_fields:
if field not in parsed:
errors.append(f"Missing required field: {field}")
# Check 3: Output not suspiciously short (truncation signal)
min_length = expected_schema.get("min_length", 0)
if len(raw_output) < min_length:
errors.append(f"Output suspiciously short ({len(raw_output)} chars, expected ≥ {min_length})")
# Check 4: Stop reason was 'end_turn' not 'max_tokens'
# (Pass stop_reason from the API response, not just the content)
stop_reason = expected_schema.get("_stop_reason")
if stop_reason == "max_tokens":
errors.append("Output was truncated at max_tokens — likely incomplete")
return AgentOutput(
raw=raw_output,
parsed=parsed,
is_valid=len(errors) == 0,
validation_errors=errors
)
Always check stop_reason in the API response — max_tokens is the signal that output was truncated. Pass it alongside the content, don't discard it.
Parallel agent coordination with partial failure
When multiple agents run in parallel and one fails, you need a policy for the others.
import asyncio
from enum import Enum
class FailurePolicy(Enum):
FAIL_ALL = "fail_all" # If any agent fails, fail the whole task
BEST_EFFORT = "best_effort" # Return whatever succeeded, mark what failed
REQUIRE_QUORUM = "quorum" # Require N of M agents to succeed
async def run_parallel_agents(
tasks: list[dict],
policy: FailurePolicy = FailurePolicy.BEST_EFFORT,
quorum: int = None
) -> dict:
"""Run agents in parallel with configurable failure policy."""
async def run_single(task: dict) -> dict:
try:
result = await asyncio.to_thread(
client.messages.create,
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": task["prompt"]}]
)
return {"id": task["id"], "success": True, "output": result.content[0].text}
except Exception as e:
return {"id": task["id"], "success": False, "error": str(e)}
results = await asyncio.gather(*[run_single(t) for t in tasks])
successes = [r for r in results if r["success"]]
failures = [r for r in results if not r["success"]]
if policy == FailurePolicy.FAIL_ALL and failures:
raise RuntimeError(f"{len(failures)} agents failed: {[f['error'] for f in failures]}")
if policy == FailurePolicy.REQUIRE_QUORUM:
required = quorum or (len(tasks) // 2 + 1)
if len(successes) < required:
raise RuntimeError(f"Quorum not met: {len(successes)}/{required} agents succeeded")
return {
"results": successes,
"failures": failures,
"complete": len(failures) == 0
}
State management: write-ahead logging
For pipelines that write to external systems (databases, APIs), use write-ahead logging to track what has committed so that retries do not cause double-writes.
from datetime import datetime
import uuid
class PipelineState:
"""Tracks committed side effects so retries are safe."""
def __init__(self, pipeline_id: str):
self.pipeline_id = pipeline_id
self.committed_operations: dict[str, Any] = {}
self.created_at = datetime.utcnow()
def has_committed(self, operation_key: str) -> bool:
return operation_key in self.committed_operations
def mark_committed(self, operation_key: str, result: Any):
self.committed_operations[operation_key] = {
"result": result,
"committed_at": datetime.utcnow().isoformat()
}
def get_committed(self, operation_key: str) -> Any:
return self.committed_operations.get(operation_key, {}).get("result")
# Usage in a pipeline
async def idempotent_pipeline(input_data: dict) -> dict:
pipeline_id = generate_task_id(input_data)
state = PipelineState(pipeline_id)
# Step 1 — only run if not already committed
step1_key = f"{pipeline_id}:step1"
if not state.has_committed(step1_key):
result1 = await run_agent_step(input_data, step="analyze")
state.mark_committed(step1_key, result1)
else:
result1 = state.get_committed(step1_key)
# Step 2
step2_key = f"{pipeline_id}:step2"
if not state.has_committed(step2_key):
result2 = await run_agent_step(result1, step="synthesize")
state.mark_committed(step2_key, result2)
else:
result2 = state.get_committed(step2_key)
return result2
For production, persist PipelineState to Redis or Postgres so it survives process restarts.
The silent degradation problem
The hardest failure mode is an agent that succeeds but produces bad output. No error thrown, but the downstream result is wrong.
The only defense is evaluating outputs before trusting them downstream. For critical pipeline steps, add a lightweight quality check:
def is_output_plausibly_correct(output: str, task_context: dict) -> tuple[bool, str]:
"""Quick sanity check before passing output downstream."""
# Check 1: Contains expected entities
expected_entities = task_context.get("required_entities", [])
for entity in expected_entities:
if entity.lower() not in output.lower():
return False, f"Expected entity missing: {entity}"
# Check 2: Not a refusal or error message
refusal_signals = ["I cannot", "I'm unable to", "I don't have access", "As an AI"]
for signal in refusal_signals:
if signal in output:
return False, f"Output appears to be a refusal: '{signal}'"
# Check 3: Length within expected range
min_len = task_context.get("min_output_length", 50)
if len(output.split()) < min_len:
return False, f"Output too short: {len(output.split())} words"
return True, "ok"
This will not catch subtle quality issues, but it catches the obvious failures that otherwise silently corrupt downstream steps.
Related: Multi-agent orchestration basics — the foundational patterns before you get to failure handling. Evaluating multi-agent systems — how to measure whether your pipeline is working.
Further reading
- Multi-agent coordination patterns — five approaches and when to use them
- Building effective agents — foundational patterns for agent reliability