Workflow Schema Specification
Version: 0.1.0 Status: Draft
Overview
Workflows compose multiple actions, agents, and logic nodes into orchestrated sequences. They support conditional logic, loops, parallel execution, error handling, data transformation, and various trigger types.
Workflows are backend-agnostic: logic nodes execute natively in the orchestrator, while agent steps may delegate to pluggable backends (native, LangChain, custom adapters). Execution backend can be overridden at the step level. See Execution Backends for backend resolution order and adapter interfaces.
Step Types
Workflows support three primary step categories:
- Action Steps: Execute connector actions
- Agent Steps: Invoke AI agents (with optional backend override)
- Logic Steps: Control flow, branching, loops, transforms
For detailed logic step semantics and the portable node catalog, see Workflow Logic Steps.
Backend Overrides
Agent and connector steps can override the execution backend:
steps:
- id: analyze_data
type: agent
agent: agent:openworkflow/data-analyst@1.0.0
backend: langchain # Override agent's default backend
input:
query: "Analyze sales trends"
See Execution Backends for resolution priority and adapter configuration.
Template Context Model
Workflows use a consistent template expression context for all data references:
{{ inputs.<field> }}— Workflow-level input parameters{{ env.<VAR> }}— Environment variables{{ secrets.<NAME> }}— Secure credentials (trusted context only){{ steps.<id>.output.<field> }}— Output from a previous step (by stepid)
Note: Always use steps.<id> to reference step outputs (the id field uniquely identifies each step).
File Format
Workflow files always use the plural workflows: key and contain an array of workflow definitions. This allows for single or multiple workflows in one file:
workflows:
- name: string # Required: workflow name
version: string # Optional: semantic version (default: "1.0.0")
description: string # Optional: what this workflow does
# ... workflow definition
Single workflow example:
workflows:
- name: Daily Weather Report
version: 1.0.0
# ... steps, triggers, etc.
Multiple workflows example:
workflows:
- name: Daily Weather Report
version: 1.0.0
# ... workflow definition
- name: Weather Alerts
version: 1.0.0
# ... workflow definition
This consistent format simplifies parsing and allows workflows to grow from one to many without restructuring.
Basic Structure
Each workflow in the array follows this structure:
workflows:
- name: string # Required: human-readable workflow name
version: string # Optional: semantic version (default: "1.0.0")
description: string # Optional: what this workflow does
# Inputs available to all steps
inputs:
param_name:
type: string|number|boolean|object
description: string
default: any # Optional default value
required: boolean # Default: false
# How this workflow is triggered
triggers:
- type: schedule|webhook|event|manual
# ... trigger-specific config
# Execution steps
steps:
- name: string # Required: unique step identifier
connector: string # Required: connector ID (e.g., connector:community/slack@1.0.0)
action: string # Required: action name
input: # Action input (can reference context)
param: value
# ... step-specific config
# Output values
outputs:
output_name:
value: "{{ steps.step_name.output.field }}"
description: string
Workflow Inputs
Define parameters that can be provided at execution time:
workflows:
- name: Weather Report Generator
inputs:
location:
type: string
description: City name to get weather for
default: "San Francisco"
required: true
units:
type: string
description: Temperature units
enum: [celsius, fahrenheit]
default: celsius
include_forecast:
type: boolean
description: Whether to include 7-day forecast
default: false
Authorization (RBAC)
Control who can execute workflows:
workflows:
- name: Delete User Data
version: 0.1.0
# Access control
authorization:
enabled: true
# Required roles (OR logic - user needs at least one)
roles: [admin, data-team-lead]
# Required permissions (AND logic - user needs all)
permissions:
- workflows:execute
- users:delete
# Approval workflow (optional)
approval:
required: true
approvers: [manager, security-team]
timeout: 3600 # 1 hour to approve
steps:
- name: delete_records
connector: connector:community/database@1.0.0
action: delete
# Step-level authorization (additional check)
authorization:
permissions: [database:write:users]
Authorization Model:
- Users authenticate with identity provider (OAuth2, SAML, etc.)
- Roles and permissions assigned via IAM system
- Workflow execution checks authorization before running
- Failed authorization logged for audit
See Security Specification for complete RBAC details.
Execute with inputs:
workflows = Workflows.from_file("workflows.yaml")
workflows.execute("Weather Report Generator", inputs={
"location": "New York",
"include_forecast": True
})
Triggers
Manual Trigger
Workflow runs on-demand only:
triggers:
- type: manual
Schedule Trigger
Run on a cron schedule:
triggers:
- type: schedule
cron: "0 8 * * *" # Every day at 8am
timezone: "America/New_York"
Webhook Trigger
Run when HTTP request received:
triggers:
- type: webhook
path: /workflows/my-workflow
method: POST
# Authentication
auth:
# HMAC signature (recommended)
type: hmac-sha256
secret_name: WEBHOOK_SECRET
header: X-Hub-Signature-256
# Alternative: Bearer token
# type: bearer
# secret_name: WEBHOOK_TOKEN
# Security options
security:
# IP allowlist
ipAllowlist:
- 192.168.1.0/24
- 10.0.0.5
# Replay attack protection
replayWindow: 300 # 5 minutes
timestampHeader: X-Request-Timestamp
# Rate limiting
rateLimit:
rpm: 60 # Requests per minute
burst: 10
# Payload limits
maxPayloadSize: 1048576 # 1MB
Webhook payload is available as trigger.payload:
steps:
- name: process_webhook
connector: connector:community/my-connector@1.0.0
action: process
input:
data: "{{ trigger.payload }}"
Event Trigger
Run when event received from configured event bus (Kafka, SNS/SQS, webhooks, etc.):
triggers:
- type: event
source: kafka # Event bus type: kafka | sns | sqs | webhook | custom
topic: user.signup # Topic/queue name
filter: # Optional: filter events
event_type: new_user
Event data available as trigger.event:
steps:
- name: welcome_user
connector: connector:community/email@1.0.0
action: send
input:
to: "{{ trigger.event.user_email }}"
template: welcome
Steps
Basic Step
steps:
- name: fetch_weather
connector: connector:community/weather@1.0.0
action: get_current_weather
input:
location: "{{ inputs.location }}"
Conditional Execution
Run step only if condition is true:
steps:
- name: check_temperature
connector: connector:community/weather@1.0.0
action: get_current_weather
input:
location: "San Francisco"
- name: send_heat_alert
connector: connector:community/slack@1.2.0
action: post_message
condition: "{{ steps.check_temperature.output.temperature > 90 }}"
input:
channel: "#alerts"
text: "Heat alert! Temperature is {{ steps.check_temperature.output.temperature }}°F"
Loops
Iterate over arrays:
steps:
- name: get_cities
connector: connector:community/database@1.0.0
action: query
input:
sql: "SELECT city FROM locations"
- name: fetch_weather_for_city
connector: connector:community/weather@1.0.0
action: get_current_weather
for_each: "{{ steps.get_cities.output.cities }}"
input:
location: "{{ item.city }}"
Each iteration's output is collected in an array. Use parallel: true for unbounded concurrency (default is sequential):
{
"fetch_weather_for_city": {
"output": [
{"temperature": 72, "city": "San Francisco"},
{"temperature": 85, "city": "Los Angeles"}
]
}
}
Parallel Execution
Run multiple steps concurrently:
steps:
- name: parallel_tasks
parallel:
- name: fetch_weather
connector: connector:community/weather@1.0.0
action: get_current_weather
input:
location: "San Francisco"
- name: fetch_news
connector: connector:community/news@1.0.0
action: get_headlines
input:
topic: weather
- name: fetch_traffic
connector: connector:community/traffic@1.0.0
action: get_conditions
input:
city: "San Francisco"
- name: generate_report
connector: connector:community/reporting@1.0.0
action: create
input:
weather: "{{ steps.parallel_tasks.fetch_weather.output }}"
news: "{{ steps.parallel_tasks.fetch_news.output }}"
traffic: "{{ steps.parallel_tasks.fetch_traffic.output }}"
Error Handling
Handle step failures gracefully:
steps:
- name: risky_operation
connector: connector:community/external-api@1.0.0
action: call
input:
endpoint: "https://api.example.com/data"
# Retry configuration
retry:
max_attempts: 3
backoff: exponential
backoff_factor: 2
# Continue workflow even if this fails (sets status to 'failed' but doesn't halt workflow)
continue_on_error: true
- name: handle_failure
connector: connector:community/logging@1.0.0
action: log_error
condition: "{{ steps.risky_operation.status == 'failed' }}"
input:
message: "Operation failed: {{ steps.risky_operation.error }}"
Timeout
Limit step execution time:
steps:
- name: slow_operation
connector: connector:community/data-processing@1.0.0
action: process_large_dataset
timeout: 300 # 5 minutes
input:
dataset_id: "12345"
Data Transformation
Use template expressions to transform data:
steps:
- name: fetch_user
connector: connector:community/database@1.0.0
action: query
input:
sql: "SELECT * FROM users WHERE id = {{ inputs.user_id }}"
- name: send_email
connector: connector:community/email@1.0.0
action: send
input:
to: "{{ steps.fetch_user.output.email }}"
subject: "Hello {{ steps.fetch_user.output.first_name }}!"
body: |
Hi {{ steps.fetch_user.output.first_name }} {{ steps.fetch_user.output.last_name }},
Your account status: {{ steps.fetch_user.output.status | upper }}
# Template filters available:
# - upper, lower, title: string case
# - default: default value if null
# - length: array/string length
# - json: serialize to JSON
# - round: round number
Workflow Outputs
Define values to return from workflow execution:
workflows:
- name: Data Pipeline
steps:
- name: fetch_data
connector: connector:community/database@1.0.0
action: query
# ...
- name: process_data
connector: connector:community/transform@1.0.0
action: process
# ...
outputs:
record_count:
value: "{{ steps.fetch_data.output.count }}"
description: Number of records processed
result_url:
value: "{{ steps.process_data.output.url }}"
description: URL to processed data
success:
value: "{{ steps.process_data.status == 'success' }}"
description: Whether pipeline completed successfully
Outputs are returned from execution:
result = workflow.execute()
print(result.outputs["record_count"]) # 1000
print(result.outputs["result_url"]) # https://...
Subworkflows
Call other workflows as steps:
steps:
- name: run_etl
workflow: data-etl-pipeline
version: "1.2.0" # Optional: specific version
input:
source: "{{ inputs.data_source }}"
destination: "{{ inputs.data_dest }}"
- name: notify_completion
plugin: slack
action: post_message
input:
channel: "#data-team"
text: "ETL completed: {{ steps.run_etl.outputs.record_count }} records"
Environment Variables
Reference environment variables:
steps:
- name: deploy
connector: connector:community/kubernetes@1.0.0
action: deploy
input:
cluster: "{{ env.K8S_CLUSTER }}"
namespace: "{{ env.K8S_NAMESPACE }}"
image: "myapp:{{ env.VERSION }}"
Secrets
Access secure credentials:
steps:
- name: call_api
connector: connector:community/http@1.0.0
action: request
input:
url: "https://api.example.com/data"
headers:
Authorization: "Bearer {{ secrets.API_TOKEN }}"
Security:
- Secrets are not logged or exposed in workflow history
- User inputs cannot access
{{ secrets.* }}(sandboxed context) - Runtime detects and blocks secret exposure in outputs
- See Security for secret redaction policies
❌ DANGEROUS - Secret Leakage:
# This will be BLOCKED at runtime
steps:
- name: debug_output
connector: connector:community/slack@1.2.0
input:
text: "Debug: {{ secrets.DATABASE_URL }}" # ❌ Blocked
Complete Examples
Example Workflow
workflows:
- name: Daily Sales Report
version: 0.1.0
description: Generate and distribute daily sales report
inputs:
report_date:
type: string
description: Date for report (YYYY-MM-DD)
default: "{{ today }}"
recipients:
type: array
description: Email addresses to send report to
required: true
triggers:
- type: schedule
cron: "0 9 * * *" # Every day at 9am
timezone: "America/New_York"
steps:
# Fetch sales data
- name: fetch_sales
connector: connector:community/database@1.0.0
action: query
input:
sql: |
SELECT product, SUM(amount) as total
FROM sales
WHERE date = '{{ inputs.report_date }}'
GROUP BY product
timeout: 60
# Calculate metrics
- name: calculate_metrics
connector: connector:community/analytics@1.0.0
action: compute
input:
data: "{{ steps.fetch_sales.output.rows }}"
metrics: [total, average, top_products]
# Generate chart
- name: create_chart
connector: connector:community/visualization@1.0.0
action: create_chart
input:
type: bar
data: "{{ steps.calculate_metrics.output }}"
title: "Sales by Product - {{ inputs.report_date }}"
# Upload to storage
- name: upload_chart
connector: connector:community/s3@1.0.0
action: upload
input:
bucket: "company-reports"
key: "sales/{{ inputs.report_date }}.png"
file: "{{ steps.create_chart.output.image_data }}"
# Send emails in parallel
- name: send_reports
connector: connector:community/email@1.0.0
action: send
for_each: "{{ inputs.recipients }}"
parallel: true
input:
to: "{{ item }}"
subject: "Daily Sales Report - {{ inputs.report_date }}"
body: |
Hi there,
Here's your daily sales report for {{ inputs.report_date }}:
Total Sales: ${{ steps.calculate_metrics.output.total }}
Average: ${{ steps.calculate_metrics.output.average }}
Top Product: {{ steps.calculate_metrics.output.top_products[0] }}
Chart: {{ steps.upload_chart.output.url }}
attachments:
- url: "{{ steps.upload_chart.output.url }}"
filename: "sales_chart.png"
# Log completion
- name: log_success
connector: connector:community/logging@1.0.0
action: info
input:
message: "Sales report generated and sent to {{ inputs.recipients | length }} recipients"
outputs:
total_sales:
value: "{{ steps.calculate_metrics.output.total }}"
description: Total sales amount
report_url:
value: "{{ steps.upload_chart.output.url }}"
description: URL to chart image
recipients_count:
value: "{{ inputs.recipients | length }}"
description: Number of recipients
Multiple Workflows in One File
Example showing multiple workflows bundled together:
workflows:
# Development workflow
- name: Run Tests
version: 0.1.0
description: Execute test suite for the connector
triggers:
- type: webhook
path: /test
method: POST
steps:
- name: run_unit_tests
connector: connector:community/testing@1.0.0
action: run_tests
input:
suite: unit
coverage: true
- name: run_integration_tests
connector: connector:community/testing@1.0.0
action: run_tests
input:
suite: integration
- name: publish_results
connector: connector:community/slack@1.2.0
action: post_message
input:
channel: "#ci-cd"
text: "Tests completed: {{ steps.run_unit_tests.output.passed }}/{{ steps.run_unit_tests.output.total }} passed"
# Deployment workflow
- name: Deploy Connector
version: 0.1.0
description: Deploy connector to production
inputs:
environment:
type: string
enum: [staging, production]
default: staging
triggers:
- type: webhook
path: /deploy
method: POST
steps:
- name: build_image
connector: connector:community/docker@1.0.0
action: build
input:
dockerfile: ./Dockerfile
tags: ["connector:{{ inputs.environment }}"]
- name: push_image
connector: connector:community/docker@1.0.0
action: push
input:
image: "{{ steps.build_image.output.image_id }}"
- name: deploy_to_k8s
connector: connector:community/kubernetes@1.0.0
action: apply
input:
manifest: ./k8s/{{ inputs.environment }}.yaml
namespace: smartify-connectors
- name: verify_health
connector: connector:community/http@1.0.0
action: get
input:
url: "https://{{ inputs.environment }}.openworkflow.ai/healthz"
retry:
max_attempts: 5
backoff: exponential
# Monitoring workflow
- name: Health Check
version: 0.1.0
description: Periodic health monitoring
triggers:
- type: schedule
cron: "*/5 * * * *" # Every 5 minutes
steps:
- name: check_endpoint
connector: connector:community/http@1.0.0
action: get
input:
url: "https://api.openworkflow.ai/connectors/weather/healthz"
timeout: 10
- name: alert_on_failure
connector: connector:community/pagerduty@1.0.0
action: create_incident
condition: "{{ steps.check_endpoint.status != 'success' }}"
input:
severity: high
title: "Weather connector health check failed"
description: "{{ steps.check_endpoint.error }}"
Validation
Validate workflows using the OpenWorkflow CLI:
# Validate single workflow
smartify workflow validate my-workflow.yaml
# Validate file with multiple workflows
smartify workflow validate workflows.yaml
Execution
Single Workflow
# Execute with default inputs
smartify workflow run my-workflow.yaml
# Execute with custom inputs
smartify workflow run my-workflow.yaml --input location="New York" --input units=fahrenheit
# Execute in cloud
smartify workflow run my-workflow.yaml --cloud
# Dry run (validate without executing)
smartify workflow run my-workflow.yaml --dry-run
Multiple Workflows
# Execute specific workflow by name
smartify workflow run workflows.yaml --workflow "Run Tests"
# List all workflows in file
smartify workflow list workflows.yaml
# Execute all workflows
smartify workflow run workflows.yaml --all
SDK Usage
Single workflow:
from openworkflow import Workflow
workflow = Workflow.from_file("my-workflow.yaml")
result = workflow.execute()
Multiple workflows:
from openworkflow import Workflows
# Load all workflows
workflows = Workflows.from_file("workflows.yaml")
# Execute specific workflow
result = workflows.execute("Run Tests")
# Execute all workflows
results = workflows.execute_all()
# Iterate workflows
for workflow in workflows:
print(f"Workflow: {workflow.name}")
result = workflow.execute()
Dry Run & Testing
Test workflows before deployment:
workflows:
- name: Data Pipeline
version: 0.1.0
# Testing configuration
testing:
# Dry run mode
dryRun:
enabled: true
mockConnectors: true # Use mocked responses
mockData:
connector:community/database@1.0.0:
query: {rows: [{id: 1, name: "test"}]}
# Test fixtures
fixtures:
- name: sample_user
inputs:
user_id: "test_123"
expectedOutputs:
status: "success"
user_name: "Test User"
- name: edge_case_empty
inputs:
user_id: "nonexistent"
expectedOutputs:
status: "error"
error_code: "USER_NOT_FOUND"
CLI Usage:
# Dry run without executing connectors
smartify workflow run workflow.yaml --dry-run
# Test with specific fixture
smartify workflow test workflow.yaml --fixture sample_user
# Validate workflow structure
smartify workflow validate workflow.yaml
Per-Step Guardrails
Add guardrails to individual steps:
steps:
- name: process_customer_data
connector: connector:community/ai-processor@1.0.0
action: analyze
# Step-level guardrails (overrides agent defaults)
guardrails:
# Content filtering
piiDetection: true # Block if PII detected in output
toxicityThreshold: 0.7
blockedTopics: [violence, hate-speech, medical-advice]
# Cost controls
maxCost: 0.10 # USD per execution
maxTokens: 1000
# Behavioral constraints
maxToolCalls: 5
timeLimit: 30 # Seconds
# Output validation
outputSchema:
type: object
required: [sentiment, summary]
Best Practices
- Descriptive names: Use clear step names that describe their purpose
- Error handling: Add retry logic and error handling for external services
- Timeouts: Set appropriate timeouts for long-running operations
- Idempotency: Design workflows to be safely re-runnable
- Testing: Use dry-run and fixtures before production deployment
- Logging: Add logging steps for debugging and audit trails
- Guardrails: Apply step-level guardrails for sensitive operations
- Documentation: Add descriptions to inputs, outputs, and complex steps
Next Steps
- Connector Schema - Define reusable connectors
- SDK Contract - Execute workflows programmatically
- Execution Modes - Local vs. cloud execution