Advanced Crewai 3 min read

CrewAI Flows: Event-Driven Pipelines for Complex AI Workflows

#crewai #flows #pipelines #event-driven #conditional #python

What Are CrewAI Flows?

CrewAI Flows are an event-driven orchestration system that sits above individual crews. Where a Crew runs a fixed set of tasks with a defined process, a Flow lets you:

  • Chain multiple crews together
  • Branch based on outputs (if/else logic)
  • Loop until a condition is met
  • Mix crews with regular Python code
  • Maintain state across the entire workflow

Think of Flows as the workflow engine and Crews as the execution units. For complex multi-stage pipelines — where the output of one crew determines which crew runs next — Flows are essential.

Core Flow Decorators

from crewai.flow.flow import Flow, listen, start, router, and_, or_
DecoratorPurpose
@start()Marks the entry point of the flow
@listen(method)Runs after the specified method completes
@router(method)Routes to different methods based on output
@and_(a, b)Runs after both a AND b complete
@or_(a, b)Runs after either a OR b completes

Basic Flow: Two Crews in Sequence

from crewai.flow.flow import Flow, listen, start
from crewai import Agent, Task, Crew
from pydantic import BaseModel

class ResearchWriteState(BaseModel):
    topic: str = ""
    research_findings: str = ""
    final_article: str = ""

class ResearchWriteFlow(Flow[ResearchWriteState]):

    @start()
    def kickoff_research(self):
        """Entry point — run the research crew."""
        researcher = Agent(
            role="Research Analyst",
            goal=f"Research {self.state.topic} thoroughly.",
            backstory="Expert researcher with web access.",
        )
        research_task = Task(
            description=f"Research {self.state.topic}. List 5 key findings.",
            expected_output="5 clear, accurate findings.",
            agent=researcher,
        )
        crew = Crew(agents=[researcher], tasks=[research_task])
        result = crew.kickoff()
        self.state.research_findings = result.raw
        return result.raw

    @listen(kickoff_research)
    def write_article(self, research_output: str):
        """Runs after research completes."""
        writer = Agent(
            role="Technical Writer",
            goal="Write a complete article from research findings.",
            backstory="Expert writer for developer audiences.",
        )
        writing_task = Task(
            description=f"Write an 800-word article based on:\n{research_output}",
            expected_output="A complete article in Markdown.",
            agent=writer,
        )
        crew = Crew(agents=[writer], tasks=[writing_task])
        result = crew.kickoff()
        self.state.final_article = result.raw
        return result.raw

# Run the flow
flow = ResearchWriteFlow()
result = flow.kickoff(inputs={"topic": "vector databases for RAG"})
print(result)
print("\nFull state:", flow.state)

Conditional Routing: Branch Based on Output

The @router decorator enables if/else branching in your workflow:

from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel
import random

class ContentQualityState(BaseModel):
    draft: str = ""
    quality_score: int = 0
    final_output: str = ""

class ContentQualityFlow(Flow[ContentQualityState]):

    @start()
    def generate_draft(self):
        # In production: run a writing crew
        self.state.draft = "This is a draft article about AI agents..."
        return self.state.draft

    @listen(generate_draft)
    def evaluate_quality(self, draft: str):
        # In production: run a QA crew that returns a score
        # Simulate a quality evaluation
        self.state.quality_score = random.randint(40, 95)
        return self.state.quality_score

    @router(evaluate_quality)
    def quality_gate(self, score: int):
        """Route based on quality score."""
        if score >= 70:
            return "publish"  # routes to publish_article method
        else:
            return "revise"   # routes to revise_article method

    @listen("publish")
    def publish_article(self):
        self.state.final_output = f"Published! Quality: {self.state.quality_score}/100"
        print(f"✅ Article published (score: {self.state.quality_score})")

    @listen("revise")
    def revise_article(self):
        print(f"⚠️  Quality too low ({self.state.quality_score}/100) — requesting revision")
        self.state.final_output = "Sent back for revision"

flow = ContentQualityFlow()
flow.kickoff()
print("Result:", flow.state.final_output)

Parallel Execution: @and_ and @or_

Run multiple crews in parallel and wait for completion:

from crewai.flow.flow import Flow, listen, start, and_
from pydantic import BaseModel

class ParallelResearchState(BaseModel):
    tech_research: str = ""
    market_research: str = ""
    final_report: str = ""

class ParallelResearchFlow(Flow[ParallelResearchState]):

    @start()
    def begin(self):
        return "starting"

    @listen(begin)
    def research_technology(self, _):
        # Run tech research crew
        self.state.tech_research = "Tech findings: LLMs are becoming multimodal..."
        return self.state.tech_research

    @listen(begin)
    def research_market(self, _):
        # Run market research crew (runs in parallel with tech research)
        self.state.market_research = "Market findings: AI agent market worth $45B by 2027..."
        return self.state.market_research

    @listen(and_(research_technology, research_market))
    def synthesize_report(self, _):
        """Runs only after BOTH research tasks complete."""
        self.state.final_report = (
            f"SYNTHESIS REPORT\n"
            f"Technology: {self.state.tech_research}\n"
            f"Market: {self.state.market_research}"
        )
        print("Both research streams complete — synthesizing...")
        return self.state.final_report

flow = ParallelResearchFlow()
result = flow.kickoff()
print(flow.state.final_report)

Retry Loop: Run Until Quality Threshold

from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel

class IterativeWritingState(BaseModel):
    content: str = ""
    attempts: int = 0
    max_attempts: int = 3
    approved: bool = False

class IterativeWritingFlow(Flow[IterativeWritingState]):

    @start()
    def write_content(self):
        self.state.attempts += 1
        print(f"Writing attempt {self.state.attempts}...")
        # In production: run writing crew
        self.state.content = f"Draft {self.state.attempts}: article content here..."
        return self.state.content

    @listen(write_content)
    def review_content(self, content: str):
        # In production: run QA crew
        # Simulate: approve on third attempt
        approved = self.state.attempts >= 3
        self.state.approved = approved
        return "approved" if approved else "rejected"

    @router(review_content)
    def decide_next_step(self, verdict: str):
        if verdict == "approved":
            return "done"
        elif self.state.attempts >= self.state.max_attempts:
            return "done"  # stop after max attempts even if not perfect
        else:
            return "retry"

    @listen("retry")
    def retry_write(self):
        print(f"Revision needed (attempt {self.state.attempts})...")
        return self.write_content()  # loop back

    @listen("done")
    def finish(self):
        status = "✅ Approved" if self.state.approved else "⚠️ Max attempts reached"
        print(f"{status} after {self.state.attempts} attempt(s)")
        return self.state.content

flow = IterativeWritingFlow()
flow.kickoff()

Async Flows

For production use, run flows asynchronously:

import asyncio
from crewai.flow.flow import Flow

class AsyncFlow(Flow):

    @start()
    async def fetch_data(self):
        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.get("https://api.example.com/data") as resp:
                return await resp.json()

    @listen(fetch_data)
    async def process_data(self, data):
        # Process with crew
        return "processed"

async def main():
    flow = AsyncFlow()
    result = await flow.kickoff_async()
    print(result)

asyncio.run(main())

Visualizing Your Flow

CrewAI can generate a visual graph of your flow:

flow = MyFlow()
flow.plot("my_workflow")  # generates my_workflow.html

Open the HTML file in a browser to see the flow as a directed graph — useful for debugging complex workflows.

Frequently Asked Questions

What is the difference between a Flow and a Crew with hierarchical process?

A Crew with hierarchical process has a manager agent that dynamically delegates tasks among a fixed set of agents. A Flow is a static, code-defined pipeline where you explicitly decide what runs when, including running different crews based on conditions. Flows give you more control; hierarchical crews give more agent autonomy.

Can I use state from earlier flow steps in later steps?

Yes — self.state persists across all methods in the flow. Any method can read or write state. This is the primary way to pass data between steps:

@listen(method_a)
def method_b(self, output_from_a):
    previous_data = self.state.some_field  # read from earlier step
    self.state.new_field = output_from_a   # write for later steps

How do I handle errors in a flow step?

Wrap the step in try/except and set a state flag:

@listen(previous_step)
def risky_step(self, input):
    try:
        result = self._do_risky_thing(input)
        self.state.error = None
        return result
    except Exception as e:
        self.state.error = str(e)
        return "error"

Then route based on the error in a @router.

Can flows call external services or APIs?

Yes — flow methods are regular Python functions. You can call REST APIs, databases, message queues, or anything else within a flow method, alongside or instead of running crews.

Next Steps

Related Articles