What Are CrewAI Flows?
CrewAI Flows are an event-driven orchestration system that sits above individual crews. Where a Crew runs a fixed set of tasks with a defined process, a Flow lets you:
- Chain multiple crews together
- Branch based on outputs (if/else logic)
- Loop until a condition is met
- Mix crews with regular Python code
- Maintain state across the entire workflow
Think of Flows as the workflow engine and Crews as the execution units. For complex multi-stage pipelines — where the output of one crew determines which crew runs next — Flows are essential.
Core Flow Decorators
from crewai.flow.flow import Flow, listen, start, router, and_, or_
| Decorator | Purpose |
|---|---|
@start() | Marks the entry point of the flow |
@listen(method) | Runs after the specified method completes |
@router(method) | Routes to different methods based on output |
@and_(a, b) | Runs after both a AND b complete |
@or_(a, b) | Runs after either a OR b completes |
Basic Flow: Two Crews in Sequence
from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Task, Crew
from pydantic import BaseModel
class ResearchWriteState(BaseModel):
topic: str = ""
research_findings: str = ""
final_article: str = ""
class ResearchWriteFlow(Flow[ResearchWriteState]):
@start()
def kickoff_research(self):
"""Entry point — run the research crew."""
researcher = Agent(
role="Research Analyst",
goal=f"Research {self.state.topic} thoroughly.",
backstory="Expert researcher with web access.",
)
research_task = Task(
description=f"Research {self.state.topic}. List 5 key findings.",
expected_output="5 clear, accurate findings.",
agent=researcher,
)
crew = Crew(agents=[researcher], tasks=[research_task])
result = crew.kickoff()
self.state.research_findings = result.raw
return result.raw
@listen(kickoff_research)
def write_article(self, research_output: str):
"""Runs after research completes."""
writer = Agent(
role="Technical Writer",
goal="Write a complete article from research findings.",
backstory="Expert writer for developer audiences.",
)
writing_task = Task(
description=f"Write an 800-word article based on:\n{research_output}",
expected_output="A complete article in Markdown.",
agent=writer,
)
crew = Crew(agents=[writer], tasks=[writing_task])
result = crew.kickoff()
self.state.final_article = result.raw
return result.raw
# Run the flow
flow = ResearchWriteFlow()
result = flow.kickoff(inputs={"topic": "vector databases for RAG"})
print(result)
print("\nFull state:", flow.state)
Conditional Routing: Branch Based on Output
The @router decorator enables if/else branching in your workflow:
from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel
import random
class ContentQualityState(BaseModel):
draft: str = ""
quality_score: int = 0
final_output: str = ""
class ContentQualityFlow(Flow[ContentQualityState]):
@start()
def generate_draft(self):
# In production: run a writing crew
self.state.draft = "This is a draft article about AI agents..."
return self.state.draft
@listen(generate_draft)
def evaluate_quality(self, draft: str):
# In production: run a QA crew that returns a score
# Simulate a quality evaluation
self.state.quality_score = random.randint(40, 95)
return self.state.quality_score
@router(evaluate_quality)
def quality_gate(self, score: int):
"""Route based on quality score."""
if score >= 70:
return "publish" # routes to publish_article method
else:
return "revise" # routes to revise_article method
@listen("publish")
def publish_article(self):
self.state.final_output = f"Published! Quality: {self.state.quality_score}/100"
print(f"✅ Article published (score: {self.state.quality_score})")
@listen("revise")
def revise_article(self):
print(f"⚠️ Quality too low ({self.state.quality_score}/100) — requesting revision")
self.state.final_output = "Sent back for revision"
flow = ContentQualityFlow()
flow.kickoff()
print("Result:", flow.state.final_output)
Parallel Execution: @and_ and @or_
Run multiple crews in parallel and wait for completion:
from crewai.flow.flow import Flow, listen, start, and_
from pydantic import BaseModel
class ParallelResearchState(BaseModel):
tech_research: str = ""
market_research: str = ""
final_report: str = ""
class ParallelResearchFlow(Flow[ParallelResearchState]):
@start()
def begin(self):
return "starting"
@listen(begin)
def research_technology(self, _):
# Run tech research crew
self.state.tech_research = "Tech findings: LLMs are becoming multimodal..."
return self.state.tech_research
@listen(begin)
def research_market(self, _):
# Run market research crew (runs in parallel with tech research)
self.state.market_research = "Market findings: AI agent market worth $45B by 2027..."
return self.state.market_research
@listen(and_(research_technology, research_market))
def synthesize_report(self, _):
"""Runs only after BOTH research tasks complete."""
self.state.final_report = (
f"SYNTHESIS REPORT\n"
f"Technology: {self.state.tech_research}\n"
f"Market: {self.state.market_research}"
)
print("Both research streams complete — synthesizing...")
return self.state.final_report
flow = ParallelResearchFlow()
result = flow.kickoff()
print(flow.state.final_report)
Retry Loop: Run Until Quality Threshold
from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel
class IterativeWritingState(BaseModel):
content: str = ""
attempts: int = 0
max_attempts: int = 3
approved: bool = False
class IterativeWritingFlow(Flow[IterativeWritingState]):
@start()
def write_content(self):
self.state.attempts += 1
print(f"Writing attempt {self.state.attempts}...")
# In production: run writing crew
self.state.content = f"Draft {self.state.attempts}: article content here..."
return self.state.content
@listen(write_content)
def review_content(self, content: str):
# In production: run QA crew
# Simulate: approve on third attempt
approved = self.state.attempts >= 3
self.state.approved = approved
return "approved" if approved else "rejected"
@router(review_content)
def decide_next_step(self, verdict: str):
if verdict == "approved":
return "done"
elif self.state.attempts >= self.state.max_attempts:
return "done" # stop after max attempts even if not perfect
else:
return "retry"
@listen("retry")
def retry_write(self):
print(f"Revision needed (attempt {self.state.attempts})...")
return self.write_content() # loop back
@listen("done")
def finish(self):
status = "✅ Approved" if self.state.approved else "⚠️ Max attempts reached"
print(f"{status} after {self.state.attempts} attempt(s)")
return self.state.content
flow = IterativeWritingFlow()
flow.kickoff()
Async Flows
For production use, run flows asynchronously:
import asyncio
from crewai.flow.flow import Flow
class AsyncFlow(Flow):
@start()
async def fetch_data(self):
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as resp:
return await resp.json()
@listen(fetch_data)
async def process_data(self, data):
# Process with crew
return "processed"
async def main():
flow = AsyncFlow()
result = await flow.kickoff_async()
print(result)
asyncio.run(main())
Visualizing Your Flow
CrewAI can generate a visual graph of your flow:
flow = MyFlow()
flow.plot("my_workflow") # generates my_workflow.html
Open the HTML file in a browser to see the flow as a directed graph — useful for debugging complex workflows.
Frequently Asked Questions
What is the difference between a Flow and a Crew with hierarchical process?
A Crew with hierarchical process has a manager agent that dynamically delegates tasks among a fixed set of agents. A Flow is a static, code-defined pipeline where you explicitly decide what runs when, including running different crews based on conditions. Flows give you more control; hierarchical crews give more agent autonomy.
Can I use state from earlier flow steps in later steps?
Yes — self.state persists across all methods in the flow. Any method can read or write state. This is the primary way to pass data between steps:
@listen(method_a)
def method_b(self, output_from_a):
previous_data = self.state.some_field # read from earlier step
self.state.new_field = output_from_a # write for later steps
How do I handle errors in a flow step?
Wrap the step in try/except and set a state flag:
@listen(previous_step)
def risky_step(self, input):
try:
result = self._do_risky_thing(input)
self.state.error = None
return result
except Exception as e:
self.state.error = str(e)
return "error"
Then route based on the error in a @router.
Can flows call external services or APIs?
Yes — flow methods are regular Python functions. You can call REST APIs, databases, message queues, or anything else within a flow method, alongside or instead of running crews.
Next Steps
- CrewAI Custom Tools and Integrations — Build the tools your flow’s agents need
- LangChain Agents and Tools — Alternative pipeline approach with LangGraph
- n8n AI Workflow Automation — No-code alternative for similar workflow patterns