I’ll write this article directly as it’s a content generation task with clear requirements.
Knowing how to evaluate AI agent performance: metrics and frameworks is the hardest part of moving an agent from prototype to production. Unlike a traditional API endpoint where latency and error rate tell most of the story, agents execute multi-step reasoning loops, call external tools, and produce open-ended outputs — making evaluation a multi-dimensional engineering problem. This guide gives you a complete evaluation stack: the right metrics, a reproducible test harness, and a continuous monitoring pipeline you can wire into CI/CD today.
The Evaluation Challenge: Why Standard Metrics Fall Short
Determinism is the enemy of agent evaluation. Run the same prompt twice and you may get different tool call sequences, different reasoning paths, and different final answers — all of which could be equally correct.
This forces us to evaluate along multiple independent axes:
| Axis | Question |
|---|---|
| Task success | Did the agent achieve the stated goal? |
| Efficiency | How many steps and tokens did it consume? |
| Faithfulness | Did it hallucinate or misuse retrieved context? |
| Robustness | Does it degrade gracefully on edge cases? |
| Latency | Is it fast enough for the user experience? |
| Cost | What did each successful task cost in USD? |
A naive approach — measuring only final-answer accuracy — misses everything in between. An agent that reaches the right answer via five unnecessary tool calls is expensive and brittle. An agent that calls the right tools in the wrong order is unreliable.
For multi-agent systems specifically (see Multi-Agent Architecture Topologies: Centralized vs Distributed), the evaluation surface multiplies — you must also measure inter-agent communication fidelity and delegation success rates.
Designing Your Evaluation Dataset
Before writing any code, you need a golden dataset: a curated collection of test cases with ground-truth labels.
Each test case must include:
- A task specification (user goal)
- Expected final answer or outcome (for exact-match tasks)
- Expected tool call sequence (ordered or unordered, depending on task type)
- A rubric for LLM-as-judge evaluation (for open-ended tasks)
Here is the schema used in this guide:
# eval/schema.py
from dataclasses import dataclass, field
from typing import Any
@dataclass
class ToolCall:
name: str
arguments: dict[str, Any]
@dataclass
class EvalCase:
id: str
task: str
expected_answer: str | None # None for rubric-only cases
expected_tool_calls: list[ToolCall] # ordered sequence; empty = unordered
rubric: str # natural-language criteria for judge
tags: list[str] = field(default_factory=list)
# Example
GOLDEN_DATASET: list[EvalCase] = [
EvalCase(
id="search-then-summarize-001",
task="Find the top 3 Python libraries for building AI agents in 2026 and summarize their key differences.",
expected_answer=None,
expected_tool_calls=[
ToolCall("web_search", {"query": "top Python AI agent frameworks 2026"}),
ToolCall("summarize", {"format": "bullet_points"}),
],
rubric=(
"The response must name at least 3 distinct libraries, "
"describe at least one distinguishing feature per library, "
"and avoid hallucinating version numbers or GitHub statistics."
),
tags=["search", "summarization", "intermediate"],
),
]
Keep golden datasets in version control. Treat them like test fixtures — review and extend them every sprint.
Implementing the Core Metrics Collector
The metrics collector intercepts every event the agent emits and accumulates structured data for each run.
# eval/collector.py
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
@dataclass
class ToolCallRecord:
name: str
arguments: dict[str, Any]
result: Any
latency_ms: float
error: str | None = None
@dataclass
class RunMetrics:
run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
case_id: str = ""
task: str = ""
final_answer: str = ""
tool_calls: list[ToolCallRecord] = field(default_factory=list)
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
total_latency_ms: float = 0.0
estimated_cost_usd: float = 0.0
success: bool | None = None # filled in by scorer
class MetricsCollector:
"""Wraps an agent executor and intercepts tool calls."""
COST_PER_1K = {"input": 0.003, "output": 0.015} # example pricing
def __init__(self, agent_executor):
self.agent = agent_executor
def run(self, case: "EvalCase") -> RunMetrics:
metrics = RunMetrics(case_id=case.id, task=case.task)
t0 = time.perf_counter()
# Patch tool calls to record latency and output
original_tools = {t.name: t for t in self.agent.tools}
patched_calls: list[ToolCallRecord] = []
def make_patched(tool):
def patched(*args, **kwargs):
ts = time.perf_counter()
try:
result = tool.func(*args, **kwargs)
patched_calls.append(ToolCallRecord(
name=tool.name,
arguments=kwargs,
result=result,
latency_ms=(time.perf_counter() - ts) * 1000,
))
return result
except Exception as exc:
patched_calls.append(ToolCallRecord(
name=tool.name,
arguments=kwargs,
result=None,
latency_ms=(time.perf_counter() - ts) * 1000,
error=str(exc),
))
raise
return patched
for tool in self.agent.tools:
tool.func = make_patched(tool)
# Execute agent
response = self.agent.invoke({"input": case.task})
metrics.final_answer = response.get("output", "")
metrics.total_latency_ms = (time.perf_counter() - t0) * 1000
metrics.tool_calls = patched_calls
# Token accounting (LangChain callback-based)
usage = response.get("usage", {})
metrics.prompt_tokens = usage.get("prompt_tokens", 0)
metrics.completion_tokens = usage.get("completion_tokens", 0)
metrics.total_tokens = metrics.prompt_tokens + metrics.completion_tokens
metrics.estimated_cost_usd = (
metrics.prompt_tokens / 1000 * self.COST_PER_1K["input"]
+ metrics.completion_tokens / 1000 * self.COST_PER_1K["output"]
)
return metrics
This pattern is framework-agnostic. Swap the agent_executor for a LangChain agent, a CrewAI crew, or any callable that emits tool calls.
Scoring: Automated Metrics and LLM-as-Judge
Once you have raw run data, you score it on two tracks in parallel.
Track 1: Deterministic Scoring (fast, cheap, exact)
# eval/scorer.py
from difflib import SequenceMatcher
from eval.schema import EvalCase, ToolCall
from eval.collector import RunMetrics
def score_tool_calls(
expected: list[ToolCall], actual: list["ToolCallRecord"]
) -> dict[str, float]:
"""Returns precision, recall, F1 over tool names (order-independent)."""
expected_names = [t.name for t in expected]
actual_names = [t.name for t in actual]
if not expected_names:
return {"precision": 1.0, "recall": 1.0, "f1": 1.0}
tp = sum(1 for n in actual_names if n in expected_names)
precision = tp / len(actual_names) if actual_names else 0.0
recall = tp / len(expected_names)
f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0.0
return {"precision": precision, "recall": recall, "f1": f1}
def score_exact_answer(expected: str | None, actual: str) -> float | None:
"""Fuzzy string match when an exact answer is known."""
if expected is None:
return None
return SequenceMatcher(None, expected.lower(), actual.lower()).ratio()
Track 2: LLM-as-Judge (slower, costlier, handles open-ended tasks)
# eval/judge.py
import anthropic
client = anthropic.Anthropic()
JUDGE_PROMPT = """
You are an impartial evaluator of AI agent outputs.
## Task given to the agent
{task}
## Agent's response
{response}
## Evaluation rubric
{rubric}
Score the response from 0 to 10 based on the rubric.
Return ONLY a JSON object with these keys:
- "score": integer 0–10
- "reason": one sentence explaining the score
- "pass": boolean (true if score >= 7)
""".strip()
def judge_response(case: EvalCase, metrics: RunMetrics) -> dict:
prompt = JUDGE_PROMPT.format(
task=case.task,
response=metrics.final_answer,
rubric=case.rubric,
)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=256,
messages=[{"role": "user", "content": prompt}],
)
import json
return json.loads(response.content[0].text)
Note the use of claude-opus-4-6 as the judge — frontier-model judges produce more consistent and calibrated scores than smaller models. This follows the same principle as using a senior engineer to review code: the judge must be at least as capable as the system under test.
Building the Evaluation Pipeline
Wire everything together into a repeatable pipeline with a Mermaid view of the data flow:
flowchart TD
A[Golden Dataset] --> B[MetricsCollector]
B --> C{Run Agent}
C -->|tool calls| D[ToolCallRecord]
C -->|final answer| E[RunMetrics]
D --> F[Deterministic Scorer]
E --> F
E --> G[LLM-as-Judge]
F --> H[Score Aggregator]
G --> H
H --> I[Results JSON]
I --> J[CI Pass/Fail Gate]
I --> K[Dashboard / Alerting]
# eval/run_eval.py
import json
import argparse
from eval.schema import GOLDEN_DATASET
from eval.collector import MetricsCollector
from eval.scorer import score_tool_calls, score_exact_answer
from eval.judge import judge_response
from your_agent import build_agent # replace with your agent factory
def run_evaluation(tags: list[str] | None = None) -> dict:
agent = build_agent()
collector = MetricsCollector(agent)
cases = GOLDEN_DATASET
if tags:
cases = [c for c in cases if any(t in c.tags for t in tags)]
results = []
for case in cases:
print(f" Running case: {case.id}")
metrics = collector.run(case)
tool_scores = score_tool_calls(case.expected_tool_calls, metrics.tool_calls)
exact_score = score_exact_answer(case.expected_answer, metrics.final_answer)
judge = judge_response(case, metrics)
result = {
"case_id": case.id,
"run_id": metrics.run_id,
"tool_f1": tool_scores["f1"],
"exact_match": exact_score,
"judge_score": judge["score"],
"judge_pass": judge["pass"],
"judge_reason": judge["reason"],
"total_tokens": metrics.total_tokens,
"latency_ms": metrics.total_latency_ms,
"cost_usd": metrics.estimated_cost_usd,
"tool_call_count": len(metrics.tool_calls),
"errors": sum(1 for t in metrics.tool_calls if t.error),
}
results.append(result)
print(f" judge_score={judge['score']}/10 latency={metrics.total_latency_ms:.0f}ms")
# Aggregate
total = len(results)
summary = {
"total_cases": total,
"judge_pass_rate": sum(r["judge_pass"] for r in results) / total,
"avg_tool_f1": sum(r["tool_f1"] for r in results) / total,
"avg_latency_ms": sum(r["latency_ms"] for r in results) / total,
"avg_cost_usd": sum(r["cost_usd"] for r in results) / total,
"avg_tokens": sum(r["total_tokens"] for r in results) / total,
"results": results,
}
return summary
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--tags", nargs="*", help="Filter by tags")
parser.add_argument("--output", default="eval_results.json")
args = parser.parse_args()
summary = run_evaluation(tags=args.tags)
with open(args.output, "w") as f:
json.dump(summary, f, indent=2)
print(f"\n=== Evaluation Summary ===")
print(f"Pass rate: {summary['judge_pass_rate']:.1%}")
print(f"Tool F1: {summary['avg_tool_f1']:.3f}")
print(f"Avg latency: {summary['avg_latency_ms']:.0f} ms")
print(f"Avg cost: ${summary['avg_cost_usd']:.4f} / run")
# Fail CI if pass rate drops below threshold
if summary["judge_pass_rate"] < 0.80:
raise SystemExit("EVAL FAILED: judge pass rate below 80%")
Run this in CI with:
python eval/run_eval.py --tags intermediate --output eval_results.json
Set --tags to a subset of your golden dataset for fast smoke tests on each pull request, and run the full suite nightly.
Production Monitoring and Regression Detection
Evaluation suites run offline, but production traffic reveals edge cases your golden dataset never anticipated. Wire in production tracing using the same RunMetrics schema, then push aggregated metrics to a time-series store.
# eval/monitor.py
import sqlite3
import json
from datetime import datetime
from eval.collector import RunMetrics
DB_PATH = "scripts/data/agentscookbook.db"
def init_db(conn: sqlite3.Connection):
conn.execute("""
CREATE TABLE IF NOT EXISTS agent_runs (
run_id TEXT PRIMARY KEY,
case_id TEXT,
timestamp TEXT,
judge_score INTEGER,
tool_f1 REAL,
latency_ms REAL,
cost_usd REAL,
total_tokens INTEGER,
error_count INTEGER,
payload JSON
)
""")
conn.commit()
def log_run(metrics: RunMetrics, judge_score: int, tool_f1: float):
conn = sqlite3.connect(DB_PATH)
init_db(conn)
conn.execute(
"""INSERT OR REPLACE INTO agent_runs VALUES (?,?,?,?,?,?,?,?,?,?)""",
(
metrics.run_id,
metrics.case_id,
datetime.utcnow().isoformat(),
judge_score,
tool_f1,
metrics.total_latency_ms,
metrics.estimated_cost_usd,
metrics.total_tokens,
sum(1 for t in metrics.tool_calls if t.error),
json.dumps({"tool_calls": [t.name for t in metrics.tool_calls]}),
),
)
conn.commit()
conn.close()
def weekly_regression_report() -> dict:
"""Compare last 7 days vs prior 7 days."""
conn = sqlite3.connect(DB_PATH)
rows = conn.execute("""
SELECT
AVG(CASE WHEN timestamp >= date('now', '-7 days') THEN judge_score END) AS recent_score,
AVG(CASE WHEN timestamp < date('now', '-7 days')
AND timestamp >= date('now', '-14 days') THEN judge_score END) AS prior_score,
AVG(CASE WHEN timestamp >= date('now', '-7 days') THEN latency_ms END) AS recent_latency,
AVG(CASE WHEN timestamp >= date('now', '-7 days') THEN cost_usd END) AS recent_cost
FROM agent_runs
""").fetchone()
conn.close()
recent, prior, latency, cost = rows
return {
"recent_avg_score": round(recent or 0, 2),
"prior_avg_score": round(prior or 0, 2),
"score_delta": round((recent or 0) - (prior or 0), 2),
"recent_avg_latency_ms": round(latency or 0, 1),
"recent_avg_cost_usd": round(cost or 0, 5),
"regression_detected": (recent or 0) < (prior or 0) - 1.0,
}
Alert on regression_detected: true via your preferred notification channel (Slack webhook, PagerDuty, email). A score drop of more than 1 point on a 10-point scale is a meaningful signal worth investigating.
For multi-agent architectures where agents delegate to sub-agents (common in frameworks like those covered in the OpenClaw Multi-Agent System guide), log each sub-agent run separately and roll up to a parent run_id for end-to-end tracing.
Frequently Asked Questions
How many golden dataset cases do I actually need?
Start with 20–30 carefully curated cases covering the core user journeys. Quality matters far more than quantity — 20 well-annotated cases with detailed rubrics will catch more regressions than 200 shallow ones. Expand the dataset when you discover production failure modes your existing cases don’t cover.
Should I use the same model as judge that I’m evaluating?
No — avoid using the model under test as its own judge. Self-evaluation introduces systematic bias toward the model’s own style and reasoning patterns. Use a different frontier model or a different capability tier. If you’re evaluating a Haiku-based agent, use Opus 4.6 as the judge.
How do I handle non-determinism when comparing two agent versions?
Run each case at least 3–5 times per version and compare the mean and standard deviation of scores rather than individual runs. A version with a higher mean and lower variance is strictly better. For A/B testing in production, route a percentage of traffic to each version and run a Mann-Whitney U test on judge scores after collecting ≥100 runs per variant.
What is a realistic pass rate threshold for production release?
This depends heavily on task complexity, but a common threshold structure is: ≥ 80% judge pass rate for baseline release, ≥ 90% for general availability, and a hard stop if tool F1 drops below 0.70 (meaning the agent is calling the wrong tools more than 30% of the time). Set thresholds based on your domain’s risk tolerance, not generic benchmarks.
How do I evaluate cost efficiency, not just correctness?
Track cost per successful task rather than average cost per run. Compute it as: total_cost_usd / (total_runs * judge_pass_rate). This single number captures both the raw API cost and the waste from failed runs — an agent that’s cheap per call but only passes 50% of cases is more expensive than one that costs twice as much per call but passes 95%.