Custom functions provide maximum flexibility by allowing you to define specific logic for step execution. Use them to preprocess inputs, orchestrate agents and teams, and postprocess outputs with complete programmatic control.
Key Capabilities
- Custom Logic: Implement complex business rules and data transformations
- Agent Integration: Call agents and teams within your custom processing logic
- Data Flow Control: Transform outputs between steps for optimal data handling
Implementation Pattern
Define a Step
with a custom function as the executor
. The function must accept a StepInput
object and return a StepOutput
object, ensuring seamless integration with the workflow system.
Example
content_planning_step = Step(
name="Content Planning Step",
executor=custom_content_planning_function,
)
def custom_content_planning_function(step_input: StepInput) -> StepOutput:
"""
Custom function that does intelligent content planning with context awareness
"""
message = step_input.input
previous_step_content = step_input.previous_step_content
# Create intelligent planning prompt
planning_prompt = f"""
STRATEGIC CONTENT PLANNING REQUEST:
Core Topic: {message}
Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}
Planning Requirements:
1. Create a comprehensive content strategy based on the research
2. Leverage the research findings effectively
3. Identify content formats and channels
4. Provide timeline and priority recommendations
5. Include engagement and distribution strategies
Please create a detailed, actionable content plan.
"""
try:
response = content_planner.run(planning_prompt)
enhanced_content = f"""
## Strategic Content Plan
**Planning Topic:** {message}
**Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}
**Content Strategy:**
{response.content}
**Custom Planning Enhancements:**
- Research Integration: {"High" if previous_step_content else "Baseline"}
- Strategic Alignment: Optimized for multi-channel distribution
- Execution Ready: Detailed action items included
""".strip()
return StepOutput(content=enhanced_content)
except Exception as e:
return StepOutput(
content=f"Custom content planning failed: {str(e)}",
success=False,
)
Standard Pattern
All custom functions follow this consistent structure:
def custom_content_planning_function(step_input: StepInput) -> StepOutput:
# 1. Custom preprocessing
# 2. Call agents/teams as needed
# 3. Custom postprocessing
return StepOutput(content=enhanced_content)
Streaming execution with custom function step on AgentOS:
If you are running an agent or team within the custom function step, you can enable streaming on the AgentOS chat page by setting stream=True
and stream_intermediate_steps=True
when calling run()
or arun()
and yielding the events.
Using the AgentOS, runs will be asynchronous and responses will be streamed.
This means you must keep the custom function step asynchronous, by using .arun()
instead of .run()
to run your Agents or Teams.
custom_function_step_async_stream.py
content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
db=InMemoryDb(),
)
async def custom_content_planning_function(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""
Custom function that does intelligent content planning with context awareness.
Note: This function calls content_planner.arun() internally, and all events
from that agent call will automatically get workflow context injected by
the workflow execution system - no manual intervention required!
"""
message = step_input.input
previous_step_content = step_input.previous_step_content
# Create intelligent planning prompt
planning_prompt = f"""
STRATEGIC CONTENT PLANNING REQUEST:
Core Topic: {message}
Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}
Planning Requirements:
1. Create a comprehensive content strategy based on the research
2. Leverage the research findings effectively
3. Identify content formats and channels
4. Provide timeline and priority recommendations
5. Include engagement and distribution strategies
Please create a detailed, actionable content plan.
"""
try:
response_iterator = content_planner.arun(
planning_prompt, stream=True, stream_intermediate_steps=True
)
async for event in response_iterator:
yield event
response = content_planner.get_last_run_output()
enhanced_content = f"""
## Strategic Content Plan
**Planning Topic:** {message}
**Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}
**Content Strategy:**
{response.content}
**Custom Planning Enhancements:**
- Research Integration: {"High" if previous_step_content else "Baseline"}
- Strategic Alignment: Optimized for multi-channel distribution
- Execution Ready: Detailed action items included
""".strip()
yield StepOutput(content=enhanced_content)
except Exception as e:
yield StepOutput(
content=f"Custom content planning failed: {str(e)}",
success=False,
)
Developer Resources