Intermediate Agentscope 7 min read

Orchestrating Complex Workflows in AgentScope with Pipelines

#AgentScope #Workflows #Pipelines #Agent Collaboration

Introduction to AgentScope Pipelines

Orchestrating complex workflows in AgentScope with Pipelines is one of the most powerful skills you can develop when building production-grade multi-agent applications. As your agent systems grow beyond a simple request-response pattern, you need a structured way to coordinate multiple agents, control execution order, and route messages between participants. AgentScope provides exactly this through its pipeline system — a composable, async-native orchestration layer that sits between your agents and your business logic.

If you are already familiar with multi-agent coordination patterns (see Multi-Agent Orchestration Patterns: LangGraph, CrewAI, and AutoGen Compared), you will notice that AgentScope takes a distinct approach. Rather than building graph-based state machines, it emphasizes hub-and-spoke message routing combined with functional pipeline composition. This makes workflows easier to reason about and modify incrementally.

This article assumes you have AgentScope installed (pip install agentscope, Python 3.10+) and understand the basics of agents and messages. If you are coming from another framework, the Getting Started with AutoGen: Build Your First Multi-Agent System article provides useful contrast for how AgentScope differs in its design philosophy.


Core Concepts: Operators, Nodes, and Connections

Before writing any pipeline code, you need a firm grasp of the three building blocks AgentScope uses.

Agents are the nodes in your workflow graph. Each agent is a self-contained actor that receives a Msg, processes it, and optionally emits a response. AgentScope ships with specialized agent classes like ReActAgent (for tool-using, reasoning agents) and UserAgent (for injecting human input into a workflow). Avoid the deprecated DialogAgent and DictDialogAgent classes from pre-v1.0 — they are no longer maintained.

MsgHub is the central nervous system of a multi-agent workflow. It manages a group of participants and handles message routing based on sender and recipient fields in each Msg. Crucially, MsgHub is used as an async context manager (async with MsgHub(...) as hub:), meaning the message-passing infrastructure is only active within that block. This scoping prevents message leakage between different conversation sessions.

Pipelines are functions that encode the control flow over a set of agents registered in a MsgHub. The primary built-in pipeline is sequential_pipeline, which calls agents one after another in a defined order. AgentScope also provides multiagent_concurrent for parallel execution. You can compose these, wrap them in conditionals, and nest them to build arbitrarily complex workflows.

The Msg object carries the actual data. Every Msg has three critical fields:

  • name — the sender’s identifier
  • content — the payload (text, structured data, etc.)
  • role — the speaker role ("user", "assistant", "system")

The MsgHub reads these fields to determine routing, so naming agents consistently is important.


Building a Sequential Processing Pipeline

The simplest non-trivial workflow is a sequential chain: Agent A processes input, Agent B processes A’s output, and so on. This pattern is common in content pipelines, data transformation tasks, and staged reasoning workflows.

Here is a complete, runnable example. We will build a three-stage pipeline where a planner agent drafts a plan, a writer agent expands it into prose, and a reviewer agent provides critique.

import asyncio
from agentscope.pipeline import MsgHub, sequential_pipeline
from agentscope.message import Msg
from agentscope.agent import ReActAgent

# Assume you have a configured model — configure via direct instantiation
# from agentscope.model import DashScopeChatModel
# model = DashScopeChatModel(model_name="qwen-max", api_key="your-key")

async def run_editorial_pipeline():
    # Create agent instances
    planner = ReActAgent(
        name="Planner",
        sys_prompt="You are a content strategist. Given a topic, output a concise 3-point outline.",
        model=model,
    )
    writer = ReActAgent(
        name="Writer",
        sys_prompt="You are a technical writer. Expand the given outline into a well-structured article section.",
        model=model,
    )
    reviewer = ReActAgent(
        name="Reviewer",
        sys_prompt="You are an editor. Review the draft and provide specific, actionable feedback.",
        model=model,
    )

    # Seed message to kick off the workflow
    seed_msg = Msg(
        name="Host",
        content="Topic: How to build a RAG pipeline with a vector database.",
        role="user",
    )

    # Register all agents with the MsgHub and run the pipeline
    async with MsgHub(participants=[planner, writer, reviewer]) as hub:
        # Inject the seed message so agents have context
        await hub.broadcast(seed_msg)

        # Run agents in sequential order
        await sequential_pipeline([planner, writer, reviewer])

asyncio.run(run_editorial_pipeline())

Key observations:

  • Every async call must be awaited — AgentScope v1.0.0 is fully asynchronous by design.
  • hub.broadcast sends a message to all current participants before the pipeline starts, giving every agent the initial context.
  • sequential_pipeline takes a list of agents and calls them in order. Each agent receives the accumulated conversation history managed by the hub.

Advanced Workflows: Parallel and Conditional Execution

Sequential pipelines are predictable but slow when agents can work independently. AgentScope supports concurrent execution for cases where multiple agents can process the same input simultaneously and their outputs are aggregated afterward.

import asyncio
from agentscope.pipeline import MsgHub, multiagent_concurrent, sequential_pipeline
from agentscope.message import Msg
from agentscope.agent import ReActAgent

async def run_parallel_review_pipeline():
    drafter = ReActAgent(
        name="Drafter",
        sys_prompt="Draft a technical explanation of the given concept.",
        model=model,
    )
    style_reviewer = ReActAgent(
        name="StyleReviewer",
        sys_prompt="Check the draft for clarity and readability. Provide feedback only.",
        model=model,
    )
    accuracy_reviewer = ReActAgent(
        name="AccuracyReviewer",
        sys_prompt="Check the draft for technical accuracy. Flag any errors.",
        model=model,
    )
    synthesizer = ReActAgent(
        name="Synthesizer",
        sys_prompt="Combine all reviewer feedback into a final, revised article section.",
        model=model,
    )

    seed = Msg(
        name="Host",
        content="Concept: Vector embeddings and cosine similarity in semantic search.",
        role="user",
    )

    async with MsgHub(participants=[drafter, style_reviewer, accuracy_reviewer, synthesizer]) as hub:
        await hub.broadcast(seed)

        # Stage 1: Drafter works alone
        await sequential_pipeline([drafter])

        # Stage 2: Both reviewers work in parallel on the draft
        await multiagent_concurrent([style_reviewer, accuracy_reviewer])

        # Stage 3: Synthesizer combines all feedback
        await sequential_pipeline([synthesizer])

asyncio.run(run_parallel_review_pipeline())

For conditional execution, AgentScope does not have a built-in conditional pipeline primitive. Instead, you handle branching with standard Python control flow inside your async functions, using the output of one agent to decide which pipeline to run next.

async def run_conditional_pipeline():
    classifier = ReActAgent(
        name="Classifier",
        sys_prompt="Classify the user query as 'technical' or 'conceptual'. Output one word only.",
        model=model,
    )
    technical_agent = ReActAgent(
        name="TechnicalExpert",
        sys_prompt="Answer technical questions with code examples.",
        model=model,
    )
    conceptual_agent = ReActAgent(
        name="ConceptExpert",
        sys_prompt="Explain concepts clearly using analogies and plain language.",
        model=model,
    )

    seed = Msg(name="Host", content="What is gradient descent?", role="user")

    async with MsgHub(participants=[classifier, technical_agent, conceptual_agent]) as hub:
        await hub.broadcast(seed)

        # Run classifier first
        classification_result = await classifier(seed)

        # Branch based on classification output
        if "technical" in classification_result.content.lower():
            await sequential_pipeline([technical_agent])
        else:
            await sequential_pipeline([conceptual_agent])

asyncio.run(run_conditional_pipeline())

This pattern keeps your orchestration logic readable while giving you full Python expressiveness for complex branching.


Managing State and Data Across the Pipeline

One of the subtler challenges in multi-agent systems is state management — how does the output of Agent A become the input for Agent C, even when Agent B runs in between?

AgentScope handles this through the MsgHub’s internal message history, which is shared across all participants. When Agent A produces a Msg, the hub stores it and makes it available to subsequent agents. You do not need to manually thread outputs through function arguments.

However, there are cases where you need explicit state — for example, accumulating structured data across multiple agents that each contribute a piece. The recommended pattern is to use a shared mutable object passed into your async function’s closure.

async def run_stateful_pipeline():
    shared_state = {"outline": None, "draft": None, "feedback": []}

    planner = ReActAgent(name="Planner", sys_prompt="Create a 3-point outline.", model=model)
    writer = ReActAgent(name="Writer", sys_prompt="Write based on the outline.", model=model)
    critic_a = ReActAgent(name="CriticA", sys_prompt="Critique from a beginner's perspective.", model=model)
    critic_b = ReActAgent(name="CriticB", sys_prompt="Critique from an expert's perspective.", model=model)

    seed = Msg(name="Host", content="Topic: Async Python patterns.", role="user")

    async with MsgHub(participants=[planner, writer, critic_a, critic_b]) as hub:
        await hub.broadcast(seed)

        # Stage 1: Generate outline and capture it
        outline_msg = await planner(seed)
        shared_state["outline"] = outline_msg.content

        # Stage 2: Write the draft using the outline
        draft_msg = await writer(outline_msg)
        shared_state["draft"] = draft_msg.content

        # Stage 3: Parallel critique
        results = await multiagent_concurrent([critic_a, critic_b])
        shared_state["feedback"] = [r.content for r in results]

        # Dynamically add a summarizer and broadcast updated state
        summarizer = ReActAgent(
            name="Summarizer",
            sys_prompt="Summarize the feedback into three actionable bullet points.",
            model=model,
        )
        hub.add(summarizer)
        await hub.broadcast(
            Msg("Host", f"Draft: {shared_state['draft']}\nFeedback: {shared_state['feedback']}", "user")
        )
        await sequential_pipeline([summarizer])

asyncio.run(run_stateful_pipeline())

Notice hub.add(summarizer) — this is MsgHub’s dynamic participant management in action. You can add or remove agents from the hub mid-workflow without restarting the context manager. Similarly, hub.broadcast lets you inject synthetic messages at any point, which is useful for resetting context or injecting structured data into the conversation.

For workflows that involve retrieval (e.g., fetching documents to ground agent responses), note that AgentScope’s v1.0.0 deprecated its built-in RAG module. You will need to integrate an external retrieval layer or vector database directly — see What Is RAG? Retrieval-Augmented Generation Explained for a framework-agnostic breakdown of how retrieval augmentation works before wiring it into your pipeline.


Frequently Asked Questions

Why must all AgentScope pipeline calls use async/await?

AgentScope v1.0.0 was redesigned from the ground up to be fully asynchronous. This enables concurrent agent execution without blocking, which is essential for production workflows where latency matters. If you try to call pipeline functions synchronously, you will get a coroutine object rather than a result. Always define your workflow inside an async def function and run it with asyncio.run() or from within an existing async context.

Can I mix sequential and concurrent pipelines in the same workflow?

Yes, and this is the recommended approach for most real-world workflows. Use sequential_pipeline when agents have strict dependencies (each agent needs the previous one’s output), and use multiagent_concurrent when agents can process the same input independently. You can call these pipeline functions multiple times within a single MsgHub context block, effectively staging your workflow.

What replaced DialogAgent and DictDialogAgent in AgentScope v1.0.0?

Both classes are deprecated in v1.0.0. The recommended replacement is ReActAgent, which supports tool use and structured reasoning via the ReAct (Reasoning + Acting) pattern. For workflows that require structured output, configure your model with an appropriate response format directly on the model object rather than relying on DictDialogAgent.

How do I configure models in AgentScope v1.0.0?

The old configuration file-based model setup is deprecated. You now instantiate model classes directly. For example, to use a DashScope model: model = DashScopeChatModel(model_name="qwen-max", api_key=os.environ["DASHSCOPE_API_KEY"]). Pass the model instance directly to your agent constructor. This makes model configuration explicit and type-safe.

What should I do if I need a conditional branch based on agent output?

AgentScope does not have a built-in conditional pipeline primitive. Use standard Python if/elif/else statements inside your async function. Call the decision-making agent first, inspect its output’s content field, and then call the appropriate pipeline function based on the result. This keeps branching logic in plain Python, which is easier to test and debug than framework-specific DSLs.

Related Articles