Documentation Index
Fetch the complete documentation index at: https://agno-v2-team-approvals.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Parallel
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
def merge_results(step_input: StepInput) -> StepOutput:
"""Merge content from previous steps."""
prev = step_input.previous_step_content or ""
return StepOutput(content=f"Merged: {prev[:500]}")
# Level 3: Mini-workflows for individual research tasks
data_agent = Agent(
name="Data Agent",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Gather raw data and statistics on the topic. Be concise (2-3 sentences).",
)
analysis_agent = Agent(
name="Analysis Agent",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Analyze the data provided. Identify key trends. Be concise (2-3 sentences).",
)
data_workflow = Workflow(
name="Data Collection",
description="Collects and analyzes raw data",
steps=[
Step(name="gather", agent=data_agent),
Step(name="analyze", agent=analysis_agent),
],
)
opinion_agent = Agent(
name="Opinion Agent",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Provide expert opinion and perspective on the topic. Be concise (2-3 sentences).",
)
opinion_workflow = Workflow(
name="Expert Opinion",
description="Gathers expert perspectives",
steps=[Step(name="opinion", agent=opinion_agent)],
)
# Level 2: Research workflow with parallel Level 3 workflows
level2_workflow = Workflow(
name="Comprehensive Research",
description="Runs data collection and expert opinion in parallel",
steps=[
Parallel(
Step(name="data_branch", workflow=data_workflow),
Step(name="opinion_branch", workflow=opinion_workflow),
name="parallel_research",
),
Step(name="merge", executor=merge_results),
],
)
# Level 1: Outermost workflow
writer = Agent(
name="Writer",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Write a polished short paragraph synthesizing all research provided.",
)
outer_workflow = Workflow(
name="Full Pipeline",
description="3-level nested workflow: research (parallel mini-workflows) -> write",
steps=[
Step(name="research", workflow=level2_workflow),
Step(name="write", agent=writer),
],
)
if __name__ == "__main__":
outer_workflow.print_response(
input="What is the future of renewable energy?",
stream=True,
)
Run the Example
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/04_workflows/06_advanced_concepts/workflow_as_a_step
pip install agno openai
python deeply_nested_workflow.py