from agno.workflow import Steps, Router, Step, Workflow
# Define two completely different workflows as Steps
image_sequence = Steps(
name="image_generation",
description="Complete image generation and analysis workflow",
steps=[
Step(name="generate_image", agent=image_generator),
Step(name="describe_image", agent=image_describer),
],
)
video_sequence = Steps(
name="video_generation",
description="Complete video production and analysis workflow",
steps=[
Step(name="generate_video", agent=video_generator),
Step(name="describe_video", agent=video_describer),
],
)
def media_sequence_selector(step_input) -> List[Step]:
"""Route to appropriate media generation pipeline"""
if not step_input.input:
return [image_sequence]
message_lower = step_input.input.lower()
if "video" in message_lower:
return [video_sequence]
elif "image" in message_lower:
return [image_sequence]
else:
return [image_sequence] # Default
# Clean workflow with clear branching
media_workflow = Workflow(
name="AI Media Generation Workflow",
description="Generate and analyze images or videos using AI agents",
steps=[
Router(
name="Media Type Router",
description="Routes to appropriate media generation pipeline",
selector=media_sequence_selector,
choices=[image_sequence, video_sequence], # Clear choices
)
],
)
# Usage examples
media_workflow.print_response("Create an image of a magical forest", markdown=True)
media_workflow.print_response("Create a cinematic video of city timelapse", markdown=True)