What Are LlamaIndex Workflows?
LlamaIndex Workflows are an event-driven orchestration system introduced in LlamaIndex 0.10+. They let you compose complex multi-step AI pipelines where each step is triggered by events from previous steps.
Unlike a simple chain (step1 | step2 | step3), workflows support:
- Branching — different paths based on conditions
- Loops — repeat steps until a condition is met
- Parallelism — run independent steps concurrently
- State management — share data between all steps
- Streaming — emit intermediate results to the caller
Core Concepts
A Workflow consists of:
- Steps (
@stepmethods) — individual processing units - Events — typed messages that trigger steps
- Context — shared state accessible by all steps
- StartEvent — the entry point
- StopEvent — the final output
from llama_index.core.workflow import (
Workflow,
step,
Event,
Context,
StartEvent,
StopEvent,
)
Your First Workflow
from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
from llama_index.llms.openai import OpenAI
import asyncio
class QueryEvent(Event):
query: str
class AnswerEvent(Event):
answer: str
class SimpleRAGWorkflow(Workflow):
@step
async def retrieve(self, ctx: Context, ev: StartEvent) -> QueryEvent:
"""Step 1: store the query and pass it forward."""
await ctx.set("query", ev.get("query"))
return QueryEvent(query=ev.get("query"))
@step
async def generate(self, ctx: Context, ev: QueryEvent) -> StopEvent:
"""Step 2: generate an answer."""
query = ev.query
llm = OpenAI(model="gpt-4o-mini")
response = await llm.acomplete(f"Answer this question: {query}")
return StopEvent(result=str(response))
# Run the workflow
async def main():
workflow = SimpleRAGWorkflow(timeout=60, verbose=True)
result = await workflow.run(query="What is retrieval-augmented generation?")
print(result)
asyncio.run(main())
Real RAG Workflow: Retrieve → Rerank → Synthesize
from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.postprocessor import SentenceTransformerRerank
from pydantic import Field
from typing import List
import asyncio
Settings.llm = OpenAI(model="gpt-4o-mini")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
# Events with typed payloads
class RetrievedEvent(Event):
nodes: list = Field(default_factory=list)
class RerankedEvent(Event):
nodes: list = Field(default_factory=list)
query: str = ""
class RAGWorkflow(Workflow):
def __init__(self, index: VectorStoreIndex, **kwargs):
super().__init__(**kwargs)
self.index = index
@step
async def retrieve(self, ctx: Context, ev: StartEvent) -> RetrievedEvent:
query = ev.get("query", "")
await ctx.set("query", query)
retriever = self.index.as_retriever(similarity_top_k=15)
nodes = await retriever.aretrieve(query)
return RetrievedEvent(nodes=nodes)
@step
async def rerank(self, ctx: Context, ev: RetrievedEvent) -> RerankedEvent:
query = await ctx.get("query")
reranker = SentenceTransformerRerank(
model="cross-encoder/ms-marco-MiniLM-L-2-v2",
top_n=5,
)
reranked = reranker.postprocess_nodes(ev.nodes, query_str=query)
return RerankedEvent(nodes=reranked, query=query)
@step
async def synthesize(self, ctx: Context, ev: RerankedEvent) -> StopEvent:
context_str = "\n\n".join([n.text for n in ev.nodes])
prompt = (
f"Context:\n{context_str}\n\n"
f"Question: {ev.query}\n\n"
"Answer based on the context above:"
)
response = await Settings.llm.acomplete(prompt)
return StopEvent(result=str(response))
# Build and run
async def main():
documents = SimpleDirectoryReader("data/").load_data()
index = VectorStoreIndex.from_documents(documents)
workflow = RAGWorkflow(index=index, timeout=120, verbose=True)
result = await workflow.run(query="How do I configure the API rate limits?")
print(result)
asyncio.run(main())
Branching: Conditional Routing
Route to different steps based on results:
from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
from typing import Union
class SimpleQueryEvent(Event):
query: str
class ComplexQueryEvent(Event):
query: str
class QueryRoutingWorkflow(Workflow):
@step
async def classify_query(
self, ctx: Context, ev: StartEvent
) -> Union[SimpleQueryEvent, ComplexQueryEvent]:
query = ev.get("query", "")
await ctx.set("query", query)
# Route based on query complexity
# In production: use an LLM classifier
if len(query.split()) < 5:
return SimpleQueryEvent(query=query)
else:
return ComplexQueryEvent(query=query)
@step
async def handle_simple(self, ctx: Context, ev: SimpleQueryEvent) -> StopEvent:
# Fast path: direct LLM answer
response = await Settings.llm.acomplete(ev.query)
return StopEvent(result=f"[simple] {response}")
@step
async def handle_complex(self, ctx: Context, ev: ComplexQueryEvent) -> StopEvent:
# Slow path: multi-step RAG
response = await Settings.llm.acomplete(
f"Decompose and answer thoroughly: {ev.query}"
)
return StopEvent(result=f"[complex] {response}")
Parallel Steps
Steps that receive the same event type run in parallel:
from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
import asyncio
class SearchResultEvent(Event):
source: str
results: str
class ParallelSearchWorkflow(Workflow):
@step(num_workers=3) # run 3 workers concurrently
async def search_sources(self, ctx: Context, ev: StartEvent) -> SearchResultEvent:
query = ev.get("query")
# In production: each worker searches a different source
await asyncio.sleep(0.1) # simulate async search
return SearchResultEvent(
source="knowledge_base",
results=f"Results from knowledge base for: {query}"
)
@step
async def merge_results(self, ctx: Context, ev: SearchResultEvent) -> StopEvent:
# Collect results from all parallel workers
results = ctx.collect_events(ev, [SearchResultEvent] * 3)
if results is None:
return None # not all results ready yet
merged = "\n".join([f"[{r.source}] {r.results}" for r in results])
return StopEvent(result=merged)
Streaming Workflow Outputs
Emit intermediate results to the caller in real-time:
from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
class ProgressEvent(Event):
message: str
class StreamingWorkflow(Workflow):
@step
async def step1(self, ctx: Context, ev: StartEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(message="Step 1: Retrieving documents..."))
await asyncio.sleep(1) # simulate work
ctx.write_event_to_stream(ProgressEvent(message="Step 2: Generating answer..."))
await asyncio.sleep(1)
return StopEvent(result="Final answer here")
async def main():
workflow = StreamingWorkflow(timeout=30)
handler = workflow.run(query="test question")
# Stream progress events as they arrive
async for event in handler.stream_events():
if isinstance(event, ProgressEvent):
print(f"Progress: {event.message}")
# Get final result
result = await handler
print(f"Result: {result}")
asyncio.run(main())
Frequently Asked Questions
When should I use Workflows vs. a Query Engine?
Use a Query Engine for single-step RAG (retrieve → synthesize). Use a Workflow when you need multiple steps, branching logic, loops, or parallel execution. A good rule: if you find yourself chaining query engines with custom Python glue code, that’s a Workflow.
How do I share data between workflow steps?
Use ctx.set() and ctx.get():
@step
async def step1(self, ctx: Context, ev: StartEvent) -> MyEvent:
await ctx.set("important_data", {"key": "value"})
return MyEvent(...)
@step
async def step2(self, ctx: Context, ev: MyEvent) -> StopEvent:
data = await ctx.get("important_data")
return StopEvent(result=data["key"])
Can I use Workflows in production with FastAPI?
Yes — workflows are async-native:
from fastapi import FastAPI
app = FastAPI()
@app.post("/query")
async def query_endpoint(request: QueryRequest):
result = await workflow.run(query=request.query)
return {"answer": result}
How do I handle errors in a workflow step?
Raise exceptions normally — LlamaIndex will catch them and propagate. For graceful degradation, catch exceptions in the step and emit a fallback event:
@step
async def risky_step(self, ctx: Context, ev: StartEvent) -> Union[SuccessEvent, FallbackEvent]:
try:
result = await do_risky_thing()
return SuccessEvent(data=result)
except Exception as e:
return FallbackEvent(error=str(e))
Next Steps
- LlamaIndex Agents and Tools — Use agents inside workflow steps
- LlamaIndex Advanced Retrieval Techniques — Plug advanced retrievers into workflow steps
- CrewAI Flows and Pipelines — Compare with CrewAI’s event-driven orchestration