Intermediate Autogen 3 min read

AutoGen Human-in-the-Loop: Keep Humans in Control of AI Agents

#autogen #human-in-the-loop #approval #review #safety #python

Why Human-in-the-Loop Matters

Fully autonomous AI agents are powerful but risky for high-stakes tasks: sending emails, deploying code, making purchases, or modifying databases. Human-in-the-loop (HITL) patterns let agents do the heavy lifting while ensuring a human approves critical actions before they happen.

AutoGen has first-class support for HITL through UserProxyAgent — an agent that represents a human and can pause the workflow to request input. This guide covers patterns from simple approval gates to sophisticated review workflows.

The UserProxyAgent as Human Proxy

UserProxyAgent is unique in AutoGen: it’s an agent that can either respond automatically (using code execution or LLM) or pause and ask a real human for input. The input_func parameter controls this behavior.

from autogen_agentchat.agents import UserProxyAgent

# Option 1: Always ask human
human = UserProxyAgent(name="human", input_func=input)

# Option 2: Never interrupt (fully autonomous)
auto = UserProxyAgent(
    name="auto",
    code_execution_config={"code_executor": executor},
)

# Option 3: Custom logic — ask human only sometimes
def smart_input(prompt: str) -> str:
    if "APPROVE" in prompt or "confirm" in prompt.lower():
        return input(f"[HUMAN NEEDED] {prompt}")
    return ""  # auto-continue otherwise

Pattern 1: Simple Approval Gate

The most common pattern: the AI drafts an action, the human approves or rejects it.

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

    # AI drafts an email
    drafter = AssistantAgent(
        name="drafter",
        model_client=model_client,
        system_message=(
            "You draft professional emails. After drafting, present it and ask "
            "the human to type APPROVE to send or REVISE with feedback."
        ),
    )

    # Human reviews
    reviewer = UserProxyAgent(
        name="human_reviewer",
        input_func=input,  # blocks until human types something
    )

    termination = TextMentionTermination("APPROVE")

    team = RoundRobinGroupChat(
        [drafter, reviewer],
        termination_condition=termination,
    )

    await Console(team.run_stream(
        task="Draft an email to our vendor requesting a 10% discount on the Q2 order."
    ))

asyncio.run(main())

When the AI presents the draft, the terminal pauses and waits for input. Type APPROVE to end, or type feedback like "Make it more formal" and the AI will revise.

Pattern 2: Multi-Step Workflow with Checkpoints

For multi-step processes (plan → code → deploy), add human checkpoints between stages:

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def run_with_checkpoint(task: str) -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

    planner = AssistantAgent(
        name="planner",
        model_client=model_client,
        system_message=(
            "Create a detailed implementation plan. End your plan with "
            "'PLAN READY — please review and type PROCEED or CANCEL.'"
        ),
    )

    checkpoint = UserProxyAgent(
        name="checkpoint",
        input_func=input,
    )

    # Phase 1: Planning (with human approval)
    plan_team = RoundRobinGroupChat(
        [planner, checkpoint],
        termination_condition=TextMentionTermination("PROCEED"),
        max_turns=6,
    )

    from autogen_agentchat.ui import Console
    result = await Console(plan_team.run_stream(task=task))

    # Check if human approved
    last_msg = result.messages[-1].content if result.messages else ""
    if "CANCEL" in last_msg.upper():
        print("Task cancelled by human.")
        return

    # Phase 2: Implementation (proceeds only after approval)
    from autogen.coding import LocalCommandLineCodeExecutor
    coder = AssistantAgent(
        name="coder",
        model_client=model_client,
        system_message="Implement the approved plan. Say DONE when complete.",
    )
    executor = UserProxyAgent(
        name="executor",
        code_execution_config={
            "code_executor": LocalCommandLineCodeExecutor(work_dir="./output")
        },
    )

    impl_team = RoundRobinGroupChat(
        [coder, executor],
        termination_condition=TextMentionTermination("DONE"),
    )
    await Console(impl_team.run_stream(task=f"Implement this plan:\n{last_msg}"))

asyncio.run(run_with_checkpoint(
    "Create a Python script that monitors a folder and sends an alert when files are added"
))

Pattern 3: Async Input (Non-Blocking)

For web applications, you don’t want a blocking input() call. Use asyncio with a queue instead:

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Queue for human responses (simulate web UI sending messages)
human_response_queue: asyncio.Queue[str] = asyncio.Queue()

async def async_input(prompt: str) -> str:
    """Non-blocking input — waits for a message from the queue."""
    print(f"\n[AWAITING HUMAN INPUT]\n{prompt}\n")
    response = await human_response_queue.get()
    return response

async def simulate_human_responses():
    """Simulate a human reviewing and approving after 2 seconds."""
    await asyncio.sleep(2)
    await human_response_queue.put("Looks good, please APPROVE and proceed.")

async def main():
    model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")

    assistant = AssistantAgent(
        name="assistant",
        model_client=model_client,
        system_message="Propose a plan. Ask for human approval. Say DONE when approved.",
    )

    human = UserProxyAgent(
        name="human",
        input_func=async_input,  # async version
    )

    team = RoundRobinGroupChat(
        [assistant, human],
        termination_condition=TextMentionTermination("DONE"),
    )

    # Run agent and human simulation concurrently
    from autogen_agentchat.ui import Console
    await asyncio.gather(
        Console(team.run_stream(task="Plan a 3-step database migration")),
        simulate_human_responses(),
    )

asyncio.run(main())

This pattern integrates with FastAPI, Django, or any async web framework: the frontend sends human responses to the queue, and the agent continues when it receives them.

Pattern 4: Tiered Autonomy

Different actions warrant different levels of oversight:

def tiered_input(prompt: str) -> str:
    """Auto-approve safe actions, ask humans for risky ones."""
    prompt_lower = prompt.lower()

    # Risky actions — always ask
    risky_keywords = ["delete", "drop table", "git push", "deploy", "send email", "payment"]
    if any(kw in prompt_lower for kw in risky_keywords):
        print(f"\n⚠️  HIGH RISK ACTION DETECTED\n{prompt}")
        return input("Type APPROVE to proceed or anything else to cancel: ")

    # Low-risk — auto-approve
    print(f"[Auto-approved] {prompt[:80]}...")
    return "APPROVE"

human = UserProxyAgent(name="human", input_func=tiered_input)

This gives you fine-grained control: the agent runs autonomously for routine tasks but always pauses for irreversible actions.

Logging Human Decisions

For audit trails, log every human decision:

import json
from datetime import datetime
from pathlib import Path

decisions_log = []

def logging_input(prompt: str) -> str:
    response = input(f"\n[REVIEW NEEDED]\n{prompt}\nYour response: ")
    decisions_log.append({
        "timestamp": datetime.now().isoformat(),
        "prompt_preview": prompt[:200],
        "human_response": response,
    })
    return response

# After the run, save the audit log
Path("audit_log.json").write_text(json.dumps(decisions_log, indent=2))

Frequently Asked Questions

When should I use HITL vs fully autonomous agents?

Use HITL for: irreversible actions (send email, delete data, make payments), high-stakes decisions (deploy to production, modify live databases), tasks where errors are expensive, and regulatory compliance requirements. Use fully autonomous agents for: read-only tasks, sandboxed environments, low-stakes automation where errors are recoverable.

Can I specify which agent triggers human review, not all?

Yes. Only agents with input_func=input pause for human input. Other agents continue automatically. In a 3-agent team, you can have the first two run autonomously and the third require human approval.

How do I integrate with Slack or Teams for approvals?

Replace input_func with a function that sends a message to Slack and waits for a reply:

import asyncio

async def slack_input(prompt: str) -> str:
    # Send message to Slack channel
    await slack_client.chat_postMessage(channel="#approvals", text=prompt)
    # Wait for a reply (poll or use Slack Events API)
    while True:
        await asyncio.sleep(5)
        reply = await check_for_slack_reply()
        if reply:
            return reply

Can the human add context mid-conversation?

Yes. Any text the human types becomes a message in the conversation. The AI reads it and incorporates it into the next response. You can correct, redirect, or add information at any approval step.

What happens if the human doesn’t respond?

The input() function blocks indefinitely. For production, use async input with a timeout:

async def input_with_timeout(prompt: str, timeout: int = 300) -> str:
    print(prompt)
    try:
        return await asyncio.wait_for(
            asyncio.get_event_loop().run_in_executor(None, input),
            timeout=timeout,
        )
    except asyncio.TimeoutError:
        return "TIMEOUT — proceeding with safe default"

Next Steps

Related Articles