Good — now I have the correct internal link paths. Here is the improved article body:
If you’ve been following along with AI development trends, you already know that single-agent systems can only take you so far. Building Your First Multi-Agent System: From Theory to Code is the natural next step after you’ve mastered individual LLM-powered agents — and this tutorial walks you through every layer, from the conceptual architecture to a fully working Python implementation you can run today.
Multi-agent systems let you decompose complex tasks across specialized agents that collaborate, delegate, and check each other’s work. The result is higher accuracy, better task coverage, and systems that genuinely scale. Let’s build one.
Understanding Multi-Agent Architecture
Before writing a single line of code, you need a clear mental model of how agents interact.
In a multi-agent system (MAS), you have at minimum:
- An orchestrator agent — the “manager” that receives the top-level task, breaks it into subtasks, and routes work to specialists
- Worker agents — specialized agents that each solve one type of problem
- A shared message bus or memory layer — how agents communicate results and pass context
Here’s the high-level flow for a research-and-report system we’ll build in this tutorial:
flowchart TD
U([User Request]) --> O[Orchestrator Agent]
O --> R[Researcher Agent]
O --> W[Writer Agent]
O --> V[Validator Agent]
R -- findings --> O
W -- draft --> O
V -- feedback --> O
O --> F([Final Report])
The orchestrator never does the “work” itself — it manages state and delegates. Each worker agent has a narrow, well-defined responsibility. This separation is what makes multi-agent systems maintainable and extensible.
For a deeper look at reasoning loops inside individual agents, see From Plan to Action: Understanding Core AI Agent Reasoning Loops.
Setting Up Your Environment
We’ll use Python 3.11+ with the Anthropic SDK and a minimal custom orchestration layer. No heavy frameworks — you’ll understand exactly what’s happening.
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install anthropic python-dotenv
Create a .env file:
ANTHROPIC_API_KEY=your_api_key_here
Create your project structure:
mkdir multi_agent_system
cd multi_agent_system
touch main.py orchestrator.py agents.py memory.py
Building the Shared Memory Layer
Before defining the agents, build the memory module they all share. This way, orchestrator.py can import it cleanly.
Create memory.py:
# memory.py — SQLite-backed persistent memory shared across all agents
import sqlite3
import json
class PersistentMemory:
"""Key-value store backed by SQLite. Survives process restarts."""
def __init__(self, db_path: str = "memory.db"):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.execute(
"CREATE TABLE IF NOT EXISTS memory "
"(key TEXT PRIMARY KEY, value TEXT, "
" updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)"
)
self.conn.commit()
def set(self, key: str, value: str) -> None:
self.conn.execute(
"INSERT OR REPLACE INTO memory (key, value, updated_at) "
"VALUES (?, ?, CURRENT_TIMESTAMP)",
(key, value),
)
self.conn.commit()
def get(self, key: str) -> str | None:
row = self.conn.execute(
"SELECT value FROM memory WHERE key = ?", (key,)
).fetchone()
return row[0] if row else None
def get_all(self) -> dict[str, str]:
rows = self.conn.execute("SELECT key, value FROM memory").fetchall()
return {k: v for k, v in rows}
def delete(self, key: str) -> None:
self.conn.execute("DELETE FROM memory WHERE key = ?", (key,))
self.conn.commit()
def clear(self) -> None:
self.conn.execute("DELETE FROM memory")
self.conn.commit()
def close(self) -> None:
self.conn.close()
PersistentMemory is intentionally simple: get, set, and clear. The SQLite backend means your pipeline can be interrupted and resumed without losing intermediate results.
Building the Core Agents
Each agent is a Python class with a run(task: str) -> str method. They share no state directly — everything passes through the orchestrator. This is the message-passing architecture pattern, and it’s the safest way to start.
Create agents.py:
# agents.py — Researcher, Writer, and Validator agents
import os
import anthropic
from dotenv import load_dotenv
load_dotenv()
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
SYSTEM_PROMPTS = {
"researcher": """You are a research specialist. Given a topic or question,
gather key facts, identify important angles, and return a structured summary.
Be concise. Format: bullet points with sources noted where relevant.""",
"writer": """You are a technical writer. Given a research summary and target audience,
produce a clear, well-structured draft. Use markdown.
Do not add facts not present in the research.""",
"validator": """You are a quality validator. Given a draft document and its source research,
identify: factual inconsistencies, missing context, logical gaps, or unclear sections.
Return a JSON object with exactly this shape: {"issues": [...], "approved": true|false}""",
}
class BaseAgent:
def __init__(self, role: str):
self.role = role
self.system_prompt = SYSTEM_PROMPTS[role]
self.history: list[dict] = []
def run(self, task: str) -> str:
self.history.append({"role": "user", "content": task})
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
system=self.system_prompt,
messages=self.history,
)
result = response.content[0].text
self.history.append({"role": "assistant", "content": result})
return result
def reset(self) -> None:
"""Clear conversation history between revision cycles."""
self.history = []
class ResearcherAgent(BaseAgent):
def __init__(self):
super().__init__("researcher")
class WriterAgent(BaseAgent):
def __init__(self):
super().__init__("writer")
class ValidatorAgent(BaseAgent):
def __init__(self):
super().__init__("validator")
Each subclass is intentionally minimal. The role string maps to a system prompt; everything else is inherited from BaseAgent. Adding a new specialist means adding one entry to SYSTEM_PROMPTS and one two-line subclass.
Orchestrating Agent Communication
The orchestrator is where the intelligence of your system lives. It manages the task pipeline, stores intermediate results in persistent memory, and decides when to retry, escalate, or finalize.
Create orchestrator.py:
# orchestrator.py — coordinates Researcher → Writer → Validator pipeline
import json
from agents import ResearcherAgent, WriterAgent, ValidatorAgent
from memory import PersistentMemory
MAX_REVISION_CYCLES = 3
class Orchestrator:
def __init__(self, db_path: str = "memory.db"):
self.researcher = ResearcherAgent()
self.writer = WriterAgent()
self.validator = ValidatorAgent()
self.memory = PersistentMemory(db_path=db_path)
def run(self, topic: str, audience: str = "intermediate developers") -> str:
print(f"[Orchestrator] Starting pipeline for: '{topic}'")
# Step 1: Research
print("[Orchestrator] Dispatching to Researcher...")
research = self.researcher.run(
f"Research this topic thoroughly: {topic}"
)
self.memory.set("research", research)
print(f"[Researcher] Done. ({len(research)} chars)")
# Step 2: Write
print("[Orchestrator] Dispatching to Writer...")
draft = self.writer.run(
f"Write a technical article for {audience}.\n\n"
f"Research summary:\n{research}"
)
self.memory.set("draft", draft)
print(f"[Writer] Draft complete. ({len(draft)} chars)")
# Step 3: Validate + revise loop
for cycle in range(MAX_REVISION_CYCLES):
print(f"[Orchestrator] Validation cycle {cycle + 1}...")
current_draft = self.memory.get("draft")
current_research = self.memory.get("research")
validation_result = self.validator.run(
f"Validate this draft against the research.\n\n"
f"Research:\n{current_research}\n\n"
f"Draft:\n{current_draft}"
)
try:
validation = json.loads(validation_result)
except json.JSONDecodeError:
# Validator returned non-JSON — treat as approved
print("[Validator] Non-JSON response; treating as approved.")
break
if validation.get("approved"):
print("[Validator] Approved.")
break
issues = validation.get("issues", [])
print(f"[Validator] Found {len(issues)} issue(s). Requesting revision...")
# Feed issues back to writer for a targeted revision
self.writer.reset()
issue_list = "\n".join(f"- {i}" for i in issues)
revised = self.writer.run(
f"Revise your draft based on this feedback:\n{issue_list}\n\n"
f"Original draft:\n{current_draft}\n\n"
f"Research:\n{current_research}"
)
self.memory.set("draft", revised)
final = self.memory.get("draft")
return final
def close(self) -> None:
self.memory.close()
Note that every intermediate result is written to PersistentMemory. If the process crashes mid-pipeline, you can inspect memory.db directly with any SQLite client to see exactly where it failed.
Wiring It Together with main.py
Create main.py:
# main.py — entry point
from orchestrator import Orchestrator
def main():
orch = Orchestrator(db_path="memory.db")
try:
result = orch.run(
topic="How vector databases enable semantic search in AI applications",
audience="senior software engineers",
)
finally:
orch.close()
print("\n" + "=" * 60)
print("FINAL OUTPUT")
print("=" * 60)
print(result)
if __name__ == "__main__":
main()
Run it:
python main.py
You’ll see each agent’s work logged to the console, with the validator loop catching and fixing issues before the final output is returned. This is a production-grade review-revise loop — the same pattern used in frameworks like AutoGen’s group chat.
Your project directory should now look like this:
multi_agent_system/
├── .env
├── main.py
├── orchestrator.py
├── agents.py
├── memory.py
└── memory.db ← created automatically on first run
Production Patterns and Best Practices
Once your basic pipeline works, you’ll immediately hit real-world constraints. Here are the patterns that matter:
1. Upgrade Shared Memory for Scale
The PersistentMemory class uses SQLite, which is sufficient for a single-process pipeline. For multi-process or distributed workloads, swap the SQLite backend for Redis:
import redis
class RedisMemory:
def __init__(self, host="localhost", port=6379, db=0):
self.r = redis.Redis(host=host, port=port, db=db, decode_responses=True)
def set(self, key: str, value: str) -> None:
self.r.set(key, value)
def get(self, key: str) -> str | None:
return self.r.get(key)
The Orchestrator can accept any object with .get() and .set() methods — just pass a RedisMemory() instance to __init__.
2. Implement Circuit Breakers
If an agent consistently fails validation, don’t loop forever. MAX_REVISION_CYCLES is already set to 3 in our code. When the loop exhausts, log the issue list and return the best available draft with a metadata flag:
# At end of revision loop — after the for block
issues = self.memory.get("last_issues") or "unknown"
print(f"[Orchestrator] Max cycles reached. Returning best draft. Issues: {issues}")
return self.memory.get("draft")
3. Use Structured Outputs for Agent Contracts
The validator returns JSON — that’s intentional. When agents communicate structured data, you avoid fragile string parsing. Consider using Pydantic to enforce the schema on the way out:
from pydantic import BaseModel
class ValidationResult(BaseModel):
issues: list[str]
approved: bool
confidence: float = 1.0
# In orchestrator, after json.loads():
result = ValidationResult(**validation)
if result.approved:
break
4. Think About Parallelism
In our pipeline, research → write → validate is sequential. But many real workflows can parallelize. If your orchestrator needs three independent research subtasks, dispatch them concurrently with asyncio.gather():
import asyncio
async def run_parallel_research(topics: list[str]) -> list[str]:
tasks = [async_researcher.run(t) for t in topics]
return await asyncio.gather(*tasks)
5. Observe Everything
Log each agent’s input, output, token count, and latency. Multi-agent bugs are hard to diagnose without traces. A simple decorator works:
import time
import functools
def trace(agent_name: str):
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start = time.time()
result = fn(*args, **kwargs)
elapsed = time.time() - start
print(f"[TRACE] {agent_name} | {elapsed:.2f}s | {len(result)} chars output")
return result
return wrapper
return decorator
# Usage: decorate BaseAgent.run with @trace(self.role)
For integrating external tools and APIs into your agents — so they can search the web, query databases, or call services — see A Developer’s Guide to AI Agent Memory: Short-Term vs Long-Term for memory patterns that extend directly to multi-agent contexts.
Frequently Asked Questions
How many agents should my system have?
Start with the minimum — usually 2 or 3. Each agent adds orchestration complexity, latency, and cost. Add a new agent only when you can articulate a specific, narrowly defined capability it provides that existing agents cannot. Most production systems that look complex are actually 3–5 well-defined agents with smart routing logic.
How do I prevent agents from hallucinating to each other?
Structured outputs are your main defense. When agents pass results as JSON with explicit fields, downstream agents can validate schema before processing. Also, keep the researcher → writer → validator separation strict: the writer should never invent facts, and the validator explicitly checks for this. Grounding agents in retrieved documents (via RAG) is the next level — see Introduction to Vector Databases: Storing and Retrieving Data for AI Agents for how to add that layer.
What’s the difference between this and using a framework like CrewAI or AutoGen?
Frameworks abstract the orchestration layer, add built-in tool use, and handle concurrency. Building from scratch (as we did here) gives you full control and no magic. The right choice depends on your team: if you’re prototyping fast, use a framework; if you’re building a production system with unusual requirements or strict cost controls, rolling your own orchestrator pays off. The concepts are identical either way.
How do I handle one agent’s failure without crashing the pipeline?
Wrap each agent dispatch in a try/except and return a structured error object instead of raising. The orchestrator should have explicit logic for each failure mode: retry, skip, fallback to a simpler agent, or escalate to human review. Never let an unhandled exception in one agent silently corrupt downstream results.
Can agents call other agents recursively?
Yes, and this is how hierarchical multi-agent systems work — an orchestrator delegates to a sub-orchestrator that manages its own team. This scales well for complex tasks (e.g., a research orchestrator that spins up separate geography, economics, and legal research sub-teams). Be careful to implement recursion depth limits and budget guards to prevent runaway API costs.