Advanced Llamaindex Tutorial 2 min read

LlamaIndex Workflows: Event-Driven AI Pipelines

#llamaindex #workflows #event-driven #pipeline #async #production

What Are LlamaIndex Workflows?

LlamaIndex Workflows are an event-driven orchestration system introduced in LlamaIndex 0.10+. They let you compose complex multi-step AI pipelines where each step is triggered by events from previous steps.

Unlike a simple chain (step1 | step2 | step3), workflows support:

  • Branching — different paths based on conditions
  • Loops — repeat steps until a condition is met
  • Parallelism — run independent steps concurrently
  • State management — share data between all steps
  • Streaming — emit intermediate results to the caller

Core Concepts

A Workflow consists of:

  • Steps (@step methods) — individual processing units
  • Events — typed messages that trigger steps
  • Context — shared state accessible by all steps
  • StartEvent — the entry point
  • StopEvent — the final output
from llama_index.core.workflow import (
    Workflow,
    step,
    Event,
    Context,
    StartEvent,
    StopEvent,
)

Your First Workflow

from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
from llama_index.llms.openai import OpenAI
import asyncio

class QueryEvent(Event):
    query: str

class AnswerEvent(Event):
    answer: str

class SimpleRAGWorkflow(Workflow):

    @step
    async def retrieve(self, ctx: Context, ev: StartEvent) -> QueryEvent:
        """Step 1: store the query and pass it forward."""
        await ctx.set("query", ev.get("query"))
        return QueryEvent(query=ev.get("query"))

    @step
    async def generate(self, ctx: Context, ev: QueryEvent) -> StopEvent:
        """Step 2: generate an answer."""
        query = ev.query
        llm = OpenAI(model="gpt-4o-mini")
        response = await llm.acomplete(f"Answer this question: {query}")
        return StopEvent(result=str(response))

# Run the workflow
async def main():
    workflow = SimpleRAGWorkflow(timeout=60, verbose=True)
    result = await workflow.run(query="What is retrieval-augmented generation?")
    print(result)

asyncio.run(main())

Real RAG Workflow: Retrieve → Rerank → Synthesize

from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.postprocessor import SentenceTransformerRerank
from pydantic import Field
from typing import List
import asyncio

Settings.llm = OpenAI(model="gpt-4o-mini")
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")

# Events with typed payloads
class RetrievedEvent(Event):
    nodes: list = Field(default_factory=list)

class RerankedEvent(Event):
    nodes: list = Field(default_factory=list)
    query: str = ""

class RAGWorkflow(Workflow):

    def __init__(self, index: VectorStoreIndex, **kwargs):
        super().__init__(**kwargs)
        self.index = index

    @step
    async def retrieve(self, ctx: Context, ev: StartEvent) -> RetrievedEvent:
        query = ev.get("query", "")
        await ctx.set("query", query)

        retriever = self.index.as_retriever(similarity_top_k=15)
        nodes = await retriever.aretrieve(query)
        return RetrievedEvent(nodes=nodes)

    @step
    async def rerank(self, ctx: Context, ev: RetrievedEvent) -> RerankedEvent:
        query = await ctx.get("query")
        reranker = SentenceTransformerRerank(
            model="cross-encoder/ms-marco-MiniLM-L-2-v2",
            top_n=5,
        )
        reranked = reranker.postprocess_nodes(ev.nodes, query_str=query)
        return RerankedEvent(nodes=reranked, query=query)

    @step
    async def synthesize(self, ctx: Context, ev: RerankedEvent) -> StopEvent:
        context_str = "\n\n".join([n.text for n in ev.nodes])
        prompt = (
            f"Context:\n{context_str}\n\n"
            f"Question: {ev.query}\n\n"
            "Answer based on the context above:"
        )
        response = await Settings.llm.acomplete(prompt)
        return StopEvent(result=str(response))

# Build and run
async def main():
    documents = SimpleDirectoryReader("data/").load_data()
    index = VectorStoreIndex.from_documents(documents)

    workflow = RAGWorkflow(index=index, timeout=120, verbose=True)
    result = await workflow.run(query="How do I configure the API rate limits?")
    print(result)

asyncio.run(main())

Branching: Conditional Routing

Route to different steps based on results:

from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
from typing import Union

class SimpleQueryEvent(Event):
    query: str

class ComplexQueryEvent(Event):
    query: str

class QueryRoutingWorkflow(Workflow):

    @step
    async def classify_query(
        self, ctx: Context, ev: StartEvent
    ) -> Union[SimpleQueryEvent, ComplexQueryEvent]:
        query = ev.get("query", "")
        await ctx.set("query", query)

        # Route based on query complexity
        # In production: use an LLM classifier
        if len(query.split()) < 5:
            return SimpleQueryEvent(query=query)
        else:
            return ComplexQueryEvent(query=query)

    @step
    async def handle_simple(self, ctx: Context, ev: SimpleQueryEvent) -> StopEvent:
        # Fast path: direct LLM answer
        response = await Settings.llm.acomplete(ev.query)
        return StopEvent(result=f"[simple] {response}")

    @step
    async def handle_complex(self, ctx: Context, ev: ComplexQueryEvent) -> StopEvent:
        # Slow path: multi-step RAG
        response = await Settings.llm.acomplete(
            f"Decompose and answer thoroughly: {ev.query}"
        )
        return StopEvent(result=f"[complex] {response}")

Parallel Steps

Steps that receive the same event type run in parallel:

from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent
import asyncio

class SearchResultEvent(Event):
    source: str
    results: str

class ParallelSearchWorkflow(Workflow):

    @step(num_workers=3)  # run 3 workers concurrently
    async def search_sources(self, ctx: Context, ev: StartEvent) -> SearchResultEvent:
        query = ev.get("query")
        # In production: each worker searches a different source
        await asyncio.sleep(0.1)  # simulate async search
        return SearchResultEvent(
            source="knowledge_base",
            results=f"Results from knowledge base for: {query}"
        )

    @step
    async def merge_results(self, ctx: Context, ev: SearchResultEvent) -> StopEvent:
        # Collect results from all parallel workers
        results = ctx.collect_events(ev, [SearchResultEvent] * 3)
        if results is None:
            return None  # not all results ready yet

        merged = "\n".join([f"[{r.source}] {r.results}" for r in results])
        return StopEvent(result=merged)

Streaming Workflow Outputs

Emit intermediate results to the caller in real-time:

from llama_index.core.workflow import Workflow, step, Event, Context, StartEvent, StopEvent

class ProgressEvent(Event):
    message: str

class StreamingWorkflow(Workflow):

    @step
    async def step1(self, ctx: Context, ev: StartEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(message="Step 1: Retrieving documents..."))
        await asyncio.sleep(1)  # simulate work

        ctx.write_event_to_stream(ProgressEvent(message="Step 2: Generating answer..."))
        await asyncio.sleep(1)

        return StopEvent(result="Final answer here")

async def main():
    workflow = StreamingWorkflow(timeout=30)
    handler = workflow.run(query="test question")

    # Stream progress events as they arrive
    async for event in handler.stream_events():
        if isinstance(event, ProgressEvent):
            print(f"Progress: {event.message}")

    # Get final result
    result = await handler
    print(f"Result: {result}")

asyncio.run(main())

Frequently Asked Questions

When should I use Workflows vs. a Query Engine?

Use a Query Engine for single-step RAG (retrieve → synthesize). Use a Workflow when you need multiple steps, branching logic, loops, or parallel execution. A good rule: if you find yourself chaining query engines with custom Python glue code, that’s a Workflow.

How do I share data between workflow steps?

Use ctx.set() and ctx.get():

@step
async def step1(self, ctx: Context, ev: StartEvent) -> MyEvent:
    await ctx.set("important_data", {"key": "value"})
    return MyEvent(...)

@step
async def step2(self, ctx: Context, ev: MyEvent) -> StopEvent:
    data = await ctx.get("important_data")
    return StopEvent(result=data["key"])

Can I use Workflows in production with FastAPI?

Yes — workflows are async-native:

from fastapi import FastAPI
app = FastAPI()

@app.post("/query")
async def query_endpoint(request: QueryRequest):
    result = await workflow.run(query=request.query)
    return {"answer": result}

How do I handle errors in a workflow step?

Raise exceptions normally — LlamaIndex will catch them and propagate. For graceful degradation, catch exceptions in the step and emit a fallback event:

@step
async def risky_step(self, ctx: Context, ev: StartEvent) -> Union[SuccessEvent, FallbackEvent]:
    try:
        result = await do_risky_thing()
        return SuccessEvent(data=result)
    except Exception as e:
        return FallbackEvent(error=str(e))

Next Steps

Related Articles