Intermediate Crewai Tutorial 5 min read

From Dev to Prod: Deploying CrewAI Applications with FastAPI and Docker

#crewai #deployment #fastapi #docker #production

Now I have enough documentation to write the article accurately.


Going from a working prototype to a production-ready service is the moment where many AI projects stall. From Dev to Prod: Deploying CrewAI Applications with FastAPI and Docker covers exactly that journey — taking a multi-agent crew from a local Python script to a containerized REST API that can run reliably in any cloud environment. This guide assumes you already understand CrewAI’s core model; if you need a foundation first, the LangChain Agents and Tools: Build Agents That Take Action article provides useful context on how agent frameworks think about tool use and execution flow.

Setting Up Your CrewAI Project

Start with the crewai CLI to scaffold a project with the correct layout. The standard directory structure it generates keeps configuration separate from logic, which is critical when you start wiring environment variables per deployment environment.

pip install crewai crewai-tools
crewai create crew content_pipeline
cd content_pipeline

The generated project looks like this:

content_pipeline/
├── .gitignore
├── pyproject.toml
├── README.md
├── .env
└── src/
    └── content_pipeline/
        ├── __init__.py
        ├── main.py
        ├── crew.py
        ├── tools/
        │   ├── custom_tool.py
        │   └── __init__.py
        └── config/
            ├── agents.yaml
            └── tasks.yaml

Install the project dependencies using the crewai installer, which reads pyproject.toml:

crewai install

Add fastapi and uvicorn to your pyproject.toml dependencies:

[tool.poetry.dependencies]
python = ">=3.10,<3.13"
crewai = {extras = ["tools"], version = ">=0.80.0,<1.0.0"}
fastapi = ">=0.111.0"
uvicorn = {extras = ["standard"], version = ">=0.30.0"}
python-dotenv = ">=1.0.0"

Then reinstall:

crewai install

Defining Agents, Tasks, and Crew

The recommended production pattern uses the @CrewBase decorator combined with @agent, @task, and @crew method decorators. This approach loads agent and task configuration from YAML files, keeping prompts and model settings out of your Python code.

First, define your agents in src/content_pipeline/config/agents.yaml:

researcher:
  role: >
    Senior Research Analyst
  goal: >
    Uncover accurate, up-to-date information on {topic}
  backstory: >
    You are a meticulous analyst with a talent for finding
    the most relevant technical information and synthesizing it clearly.

writer:
  role: >
    Technical Content Writer
  goal: >
    Write engaging, developer-focused content about {topic}
  backstory: >
    You transform dense technical research into clear, practical
    tutorials that developers actually enjoy reading.

Then define tasks in src/content_pipeline/config/tasks.yaml:

research_task:
  description: >
    Research the topic: {topic}. Find key concepts, recent developments,
    and practical implementation patterns. Aim for depth over breadth.
  expected_output: >
    A detailed research brief covering core concepts, implementation
    patterns, and key facts about {topic}.
  agent: researcher

writing_task:
  description: >
    Using the research brief, write a complete technical article about {topic}.
    Include code examples and practical recommendations.
  expected_output: >
    A complete markdown article of 800–1200 words, including code blocks,
    aimed at intermediate developers.
  agent: writer
  output_file: output/article.md

Now implement the crew class in src/content_pipeline/crew.py:

from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai_tools import SerperDevTool
from typing import List

@CrewBase
class ContentPipelineCrew:
    """Content research and writing crew."""

    agents: List[BaseAgent]
    tasks: List[Task]

    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"

    @agent
    def researcher(self) -> Agent:
        return Agent(
            config=self.agents_config["researcher"],  # type: ignore[index]
            tools=[SerperDevTool()],
            verbose=True,
        )

    @agent
    def writer(self) -> Agent:
        return Agent(
            config=self.agents_config["writer"],  # type: ignore[index]
            verbose=True,
        )

    @task
    def research_task(self) -> Task:
        return Task(
            config=self.tasks_config["research_task"],  # type: ignore[index]
        )

    @task
    def writing_task(self) -> Task:
        return Task(
            config=self.tasks_config["writing_task"],  # type: ignore[index]
        )

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,
            verbose=True,
        )

The @CrewBase decorator handles YAML loading and wires the decorated methods together automatically — self.agents and self.tasks are populated by the @agent and @task decorators respectively.

Building the FastAPI Wrapper

Crew executions are long-running operations — often 30–120 seconds. You must not call crew.kickoff() synchronously inside a FastAPI route handler, as this blocks the entire event loop. Use kickoff_async() (thread-based async) or akickoff() (native async) instead.

Create src/content_pipeline/api.py:

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from dotenv import load_dotenv
import uuid
import os

from content_pipeline.crew import ContentPipelineCrew

load_dotenv()

# In-memory job store — replace with Redis or a DB in real production
job_store: dict[str, dict] = {}


class RunRequest(BaseModel):
    topic: str


class JobStatus(BaseModel):
    job_id: str
    status: str  # "pending" | "running" | "done" | "error"
    result: str | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: warm up anything needed
    yield
    # Shutdown: clean up


app = FastAPI(title="ContentPipeline API", lifespan=lifespan)


async def run_crew_job(job_id: str, topic: str) -> None:
    """Background coroutine: runs the crew and stores the result."""
    job_store[job_id]["status"] = "running"
    try:
        crew_instance = ContentPipelineCrew().crew()
        result = await crew_instance.kickoff_async(inputs={"topic": topic})
        job_store[job_id]["status"] = "done"
        job_store[job_id]["result"] = str(result)
    except Exception as exc:
        job_store[job_id]["status"] = "error"
        job_store[job_id]["result"] = str(exc)


@app.post("/run", response_model=JobStatus, status_code=202)
async def run_crew(request: RunRequest, background_tasks: BackgroundTasks):
    """
    Accepts a topic, starts an async crew run, and returns a job ID.
    Poll /status/{job_id} to retrieve the result.
    """
    job_id = str(uuid.uuid4())
    job_store[job_id] = {"status": "pending", "result": None}
    background_tasks.add_task(run_crew_job, job_id, request.topic)
    return JobStatus(job_id=job_id, status="pending")


@app.get("/status/{job_id}", response_model=JobStatus)
async def get_status(job_id: str):
    if job_id not in job_store:
        raise HTTPException(status_code=404, detail="Job not found")
    job = job_store[job_id]
    return JobStatus(job_id=job_id, status=job["status"], result=job["result"])


@app.get("/health")
async def health():
    return {"status": "ok"}

This pattern — submit a job, receive a job ID, poll for the result — is the correct async design for AI agents. For production workloads with multiple workers, replace job_store with a Redis backend or a lightweight queue like ARQ.

Also see how Advanced AutoGen: Empowering Agents with Custom Tools and Functions handles multi-agent async execution for a comparable pattern in a different framework.

Environment Variables and Secrets Management

CrewAI resolves the active LLM through the MODEL environment variable. Set it alongside the appropriate provider API key in your .env file:

# .env  — NEVER commit this file
MODEL=gpt-4o
OPENAI_API_KEY=sk-...

# Optional: for SerperDevTool (web search)
SERPER_API_KEY=...

For an alternative LLM provider, swap the MODEL value — CrewAI delegates to LiteLLM under the hood, so the same pattern works for Anthropic, Google, and others:

MODEL=claude-3-5-sonnet-20241022
ANTHROPIC_API_KEY=sk-ant-...

In Docker and cloud environments, never bake secrets into the image. Pass them as runtime environment variables or use a secrets manager.

Containerizing with Docker

Create a Dockerfile at the project root:

FROM python:3.11-slim

# System deps for common crewai-tools dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential curl \
    && rm -rf /var/lib/apt/lists/*

# Install uv for fast dependency resolution (same tool crewai uses)
RUN pip install uv

WORKDIR /app

# Copy dependency files first for better layer caching
COPY pyproject.toml ./
COPY src/ ./src/

# Install dependencies
RUN uv pip install --system --no-cache .

# Create output directory used by writing_task output_file
RUN mkdir -p output

EXPOSE 8000

CMD ["uvicorn", "content_pipeline.api:app", "--host", "0.0.0.0", "--port", "8000"]

Add a docker-compose.yml for local development with secrets:

version: "3.9"

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MODEL=${MODEL}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - SERPER_API_KEY=${SERPER_API_KEY}
    volumes:
      # Mount output dir so articles persist on the host
      - ./output:/app/output
    restart: unless-stopped

Docker Compose reads from your local .env file by default, so your secrets are injected at runtime without ever touching the image layer.

Build and run:

# Build the image
docker compose build

# Start the service
docker compose up -d

# Test the health check
curl http://localhost:8000/health

# Submit a job
curl -X POST http://localhost:8000/run \
  -H "Content-Type: application/json" \
  -d '{"topic": "vector databases for AI agents"}'

# Poll for result (replace with actual job_id)
curl http://localhost:8000/status/<job_id>

Scaling and Production Hardening

A few additions turn the basic setup into a production-ready service:

Multiple workers — Uvicorn runs a single process by default. For concurrent crew executions, add Gunicorn as the process manager:

CMD ["gunicorn", "content_pipeline.api:app", \
     "--workers", "2", \
     "--worker-class", "uvicorn.workers.UvicornWorker", \
     "--bind", "0.0.0.0:8000", \
     "--timeout", "300"]

Set --timeout 300 (5 minutes) because crew executions can be long. The default 30-second timeout will kill your workers mid-run.

Rate limiting — Add slowapi to protect the /run endpoint from being hammered:

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/run", response_model=JobStatus, status_code=202)
@limiter.limit("5/minute")
async def run_crew(request: Request, body: RunRequest, background_tasks: BackgroundTasks):
    ...

Persistent job state — The in-memory job_store is lost on container restart. For true production resilience, back it with Redis:

import redis.asyncio as aioredis

redis = aioredis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379"))

async def set_job(job_id: str, data: dict) -> None:
    await redis.setex(job_id, 3600, json.dumps(data))  # TTL: 1 hour

async def get_job(job_id: str) -> dict | None:
    raw = await redis.get(job_id)
    return json.loads(raw) if raw else None

For cloud deployments, Cloudflare Workers, Railway, or Render all support Docker image deploys with environment variable injection. If you want a comparison of where to host your AI workloads, see Cloud LLM vs Local LLM for AI Agents: The 2026 Decision Guide for a cost/performance breakdown.

Frequently Asked Questions

Why use kickoff_async() instead of kickoff() in FastAPI?

FastAPI is built on Python’s async event loop. Calling the synchronous kickoff() inside an async def route blocks the entire loop, preventing FastAPI from handling any other requests while the crew runs. kickoff_async() runs the crew in a thread pool executor, keeping the event loop free. The native akickoff() method takes this further by using Python’s asyncio directly, which is preferred when you need maximum concurrency.

How do I switch LLM providers without changing code?

Set the MODEL environment variable to any LiteLLM-compatible model ID (e.g., claude-3-5-sonnet-20241022, gemini-2.0-flash, gpt-4o) and update the corresponding API key in your .env or Docker environment. CrewAI resolves the provider at runtime, so no code changes are needed.

How do I pass different inputs to the same crew for different requests?

Use the inputs argument to kickoff_async(). Every {placeholder} in your YAML description and goal fields is filled in from this dictionary at runtime:

result = await crew.kickoff_async(inputs={"topic": request.topic, "language": "python"})

Can I run multiple crews in parallel from the same FastAPI instance?

Yes. Because each request triggers an independent ContentPipelineCrew() instantiation and a separate kickoff_async() call, crews are isolated and run concurrently (bounded by Gunicorn worker count and LLM API rate limits). Shared state like the job_store must be thread-safe — which is why Redis is recommended for production.

What happens if a crew run fails mid-execution?

In the implementation above, any exception raised during kickoff_async() is caught and stored in job_store[job_id]["result"] with status = "error". Callers polling /status/{job_id} will see the error message. For automatic retries, wrap the crew call in a retry loop with exponential backoff, or use a task queue like Celery or ARQ that has built-in retry semantics.

Related Articles