Workflow Logic Steps Specification
Version: 0.1.0 Status: Draft
Overview
Logic steps are first-class workflow step types that enable control flow, branching, iteration, error handling, and data transformation without requiring external connectors or agents. They are semantically minimal, portable across execution backends, and designed for day-1 availability.
Design Principles
Determinism: Given the same inputs and state, logic steps must produce the same routing decisions. Implementations should avoid non-deterministic behavior (randomness, timestamps in conditions) unless explicitly required.
Observability: All logic step executions emit span attributes for tracing:
step.type— The logic step type (if, switch, forEach, etc.)step.id— The unique step identifierstep.branch— The branch taken (for if/switch)step.iterations— Number of iterations (for forEach)step.join_strategy— Join semantics (for parallel)
Step Types
OpenWorkflow workflows support 8 core logic step types:
- If / Condition - Conditional branching based on expression evaluation
- Switch / Route - Multi-way branching based on value matching
- For-Each / Loop - Iterate over arrays with subgraph execution
- Parallel / Map - Fan-out to multiple branches with join semantics
- Wait / Delay / Until - Pause execution for duration or event
- Try / Catch / Retry - Error handling with retry policies
- Transform - Pure data transformation without side effects
- Handoff - Agent switching at workflow level
1. If / Condition Step
Evaluate an expression and route to true or false branches.
Schema
steps:
- id: check_temperature
type: if
condition: "{{ steps.fetch_weather.output.temperature > 30 }}"
then:
- id: send_heat_alert
type: action
connector: connector:community/slack@1.2.0
action: sendMessage
input:
channel: "#alerts"
text: "High temperature alert: {{ steps.fetch_weather.output.temperature }}°C"
else:
- id: log_normal
type: action
connector: connector:community/logging@1.0.0
action: info
input:
message: "Temperature is normal"
Fields
{
"id": string, // Required: unique step ID
"type": "if", // Required
"condition": string, // Required: expression to evaluate
"then": Step[], // Required: steps to execute if true
"else": Step[] // Optional: steps to execute if false
}
Expression Syntax
Conditions support:
- Comparisons:
>,<,>=,<=,==,!= - Logical:
&&,||,! - Template references:
{{ steps.step_id.output.field }} - Functions:
length(),contains(),startsWith(),endsWith()
Examples:
# Simple comparison
condition: "{{ steps.check_user.output.age >= 18 }}"
# Logical operators
condition: "{{ steps.check_balance.output.amount > 100 && steps.check_status.output.active }}"
# String matching
condition: "{{ steps.classify.output.category == 'urgent' }}"
# Array length
condition: "{{ steps.fetch_results.output.items | length > 0 }}"
# Contains check
condition: "{{ steps.get_tags.output | contains('priority') }}"
Output
If step produces:
{
"branch": "then" | "else",
"conditionResult": true | false
}
2. Switch / Route Step
Match a value against multiple cases and route to the corresponding branch.
Schema
steps:
- id: route_by_priority
type: switch
value: "{{ steps.classify_ticket.output.priority }}"
cases:
high:
- id: notify_oncall
type: action
connector: connector:community/pagerduty@1.0.0
action: createIncident
medium:
- id: create_ticket
type: action
connector: connector:community/jira@2.0.0
action: createIssue
low:
- id: queue_for_review
type: action
connector: connector:community/queue@1.0.0
action: enqueue
default:
- id: log_unknown
type: action
connector: connector:community/logging@1.0.0
action: warn
Fields
{
"id": string,
"type": "switch",
"value": string, // Required: value or expression to match
"cases": { // Required: case-to-steps mapping
[key: string]: Step[]
},
"default": Step[] // Optional: default case
}
Output
{
"case": "high" | "medium" | "low" | "default",
"matchedValue": "..."
}
3. For-Each / Loop Step
Iterate over an array and execute a subgraph for each item.
Schema
steps:
- id: process_users
type: forEach
items: "{{ steps.fetch_users.output.users }}"
maxConcurrency: 5
ordered: false
steps:
- id: send_welcome_email
type: action
connector: connector:community/sendgrid@1.0.0
action: sendEmail
input:
to: "{{ item.email }}"
template: "welcome"
- id: create_profile
type: action
connector: connector:community/database@1.0.0
action: insert
input:
table: "profiles"
data: "{{ item }}"
Fields
{
"id": string,
"type": "forEach",
"items": string, // Required: array expression
"maxConcurrency": number, // Optional: max parallel executions, default unlimited
"ordered": boolean, // Optional: preserve order, default false
"continueOnError": boolean, // Optional: continue if item fails, default false
"steps": Step[] // Required: subgraph to execute per item
}
Item Context
Inside the loop, access the current item:
input:
value: "{{ item }}" # Current item
index: "{{ index }}" # Current index (0-based)
isFirst: "{{ isFirst }}" # Boolean: first iteration
isLast: "{{ isLast }}" # Boolean: last iteration
Output
{
"results": [ // Array of results (if ordered: true)
{"status": "success", "output": {...}},
{"status": "error", "error": "..."}
],
"successCount": 8,
"errorCount": 2,
"totalCount": 10
}
4. Parallel / Map Step
Execute multiple branches concurrently and join results.
Schema
steps:
- id: gather_data
type: parallel
join: all # all | any | count(n)
timeout: 60
branches:
weather:
- id: fetch_weather
type: action
connector: connector:community/weather@1.0.0
action: getCurrentWeather
news:
- id: fetch_news
type: action
connector: connector:community/news@1.0.0
action: getHeadlines
stocks:
- id: fetch_stocks
type: action
connector: connector:community/finance@1.0.0
action: getQuotes
Fields
{
"id": string,
"type": "parallel",
"join": "all" | "any" | {"count": number}, // Required: join strategy
"timeout": number, // Optional: max wait time in seconds
"continueOnError": boolean, // Optional: don't fail if one branch errors
"branches": { // Required: named branches
[name: string]: Step[]
}
}
Join Strategies
- all: Wait for all branches to complete (default)
- any: Return as soon as any branch completes
- count(n): Return when N branches complete
Output
{
"branches": {
"weather": {"status": "success", "output": {...}},
"news": {"status": "success", "output": {...}},
"stocks": {"status": "error", "error": "..."}
},
"completedCount": 2,
"errorCount": 1,
"joinStrategy": "all"
}
5. Wait / Delay / Until Step
Pause execution for a duration or until an event occurs.
Schema
Duration-based wait:
steps:
- id: wait_before_retry
type: wait
duration: 30s # 30 seconds
Event-based wait:
steps:
- id: wait_for_approval
type: wait
until:
event: "approval.received"
timeout: 86400 # 24 hours
channel: "webhook"
Scheduled wait:
steps:
- id: wait_until_time
type: wait
until:
time: "2025-10-08T09:00:00Z"
Fields
{
"id": string,
"type": "wait",
// Option 1: Duration
"duration": string, // e.g., "5s", "10m", "1h", "2d"
// Option 2: Until event
"until": {
"event": string, // Event name to wait for
"timeout": number, // Max wait time in seconds
"channel": "webhook" | "kafka" | "pubsub",
"filter": object // Optional: event filter
},
// Option 3: Until timestamp
"until": {
"time": string // ISO 8601 timestamp
}
}
Output
{
"waitType": "duration" | "event" | "time",
"waitedFor": 30, // seconds
"eventReceived": {...}, // if event-based
"timedOut": false
}
6. Try / Catch / Retry Step
Wrap a subgraph with error handling and retry logic.
Schema
steps:
- id: reliable_api_call
type: try
retry:
maxAttempts: 3
backoff: exponential
backoffFactor: 2
retryOn: ["timeout", "rate_limit"]
steps:
- id: call_external_api
type: action
connector: connector:acme/external-api@1.0.0
action: fetchData
catch:
- id: log_failure
type: action
connector: connector:community/logging@1.0.0
action: error
input:
message: "API call failed: {{ error.message }}"
- id: send_alert
type: action
connector: connector:community/slack@1.2.0
action: sendMessage
input:
channel: "#ops"
text: "External API failure"
Fields
{
"id": string,
"type": "try",
"steps": Step[], // Required: steps to execute
"retry": { // Optional: retry policy
"maxAttempts": number, // Max retry attempts
"backoff": "fixed" | "linear" | "exponential",
"backoffFactor": number, // Multiplier for backoff
"retryOn": string[] // Error codes to retry on
},
"catch": Step[], // Optional: error handler steps
"finally": Step[] // Optional: always execute
}
Error Context
Inside catch and finally blocks:
input:
errorMessage: "{{ error.message }}"
errorCode: "{{ error.code }}"
errorDetails: "{{ error.details }}"
attemptCount: "{{ error.attemptCount }}"
Output
{
"status": "success" | "error",
"attempts": 2,
"output": {...}, // if success
"error": {...} // if error after all retries
}
7. Transform Step
Pure data transformation without side effects.
Schema
steps:
- id: format_response
type: transform
language: jq # jq | jsonpath | template | javascript
expression: |
{
"summary": "\(.count) items found",
"items": [.results[] | {id: .id, name: .name}],
"total": (.results | length)
}
Template-based:
steps:
- id: build_message
type: transform
language: template
expression: |
Hello {{ steps.fetch_user.output.name }},
Your order #{{ steps.create_order.output.id }} has been confirmed.
Total: ${{ steps.calculate_total.output.amount }}
JavaScript sandbox:
steps:
- id: custom_transform
type: transform
language: javascript
expression: |
const input = context.steps.fetch_data.output;
return {
processed: input.items.map(x => x.value * 2),
count: input.items.length
};
Fields
{
"id": string,
"type": "transform",
"language": "jq" | "jsonpath" | "template" | "javascript" | "python",
"expression": string, // Required: transformation expression
"timeout": number // Optional: execution timeout
}
Supported Languages
- jq: JSON query language (powerful, sandboxed)
- jsonpath: JSONPath expressions
- template: Template strings with
{{ }}syntax - javascript: Sandboxed JavaScript (limited APIs)
- python: Sandboxed Python (limited imports)
Security
Sandboxed environments with:
- No file system access
- No network access
- Limited memory and CPU
- Timeout enforcement
Output
{
"result": {...} // Transformed data
}
8. Handoff Step
Switch to a different agent and continue execution.
Schema
steps:
- id: escalate_to_human
type: handoff
agent: agent:openworkflow/human-support@1.0.0
reason: "Customer requested human agent"
context:
conversationHistory: "{{ steps.chat.output.history }}"
userProfile: "{{ steps.fetch_user.output }}"
resumeAfter: true # Continue workflow after handoff
Agent selection by condition:
steps:
- id: intelligent_routing
type: handoff
selectAgent:
- condition: "{{ steps.classify.output.category == 'technical' }}"
agent: agent:acme/technical-support@1.0.0
- condition: "{{ steps.classify.output.category == 'billing' }}"
agent: agent:acme/billing-support@1.0.0
- default: agent:acme/general-support@1.0.0
context:
ticket: "{{ steps.create_ticket.output }}"
Fields
{
"id": string,
"type": "handoff",
// Option 1: Direct agent reference
"agent": string, // Agent ID
// Option 2: Conditional selection
"selectAgent": [
{
"condition": string,
"agent": string
}
],
"reason": string, // Optional: handoff reason
"context": object, // Optional: context to pass
"resumeAfter": boolean, // Optional: continue after handoff
"timeout": number // Optional: handoff timeout
}
Output
{
"handoffTo": "agent:openworkflow/human-support@1.0.0",
"reason": "Customer requested human agent",
"agentResponse": {...}, // Response from new agent
"conversationId": "conv_123"
}
Complete Example: Order Processing Workflow
workflows:
- name: Order Processing Pipeline
version: 1.0.0
inputs:
orderId:
type: string
required: true
steps:
# 1. Try block with retry
- id: fetch_order
type: try
retry:
maxAttempts: 3
backoff: exponential
steps:
- id: get_order_details
type: action
connector: connector:acme/orders@1.0.0
action: getOrder
input:
orderId: "{{ inputs.orderId }}"
catch:
- id: log_fetch_error
type: action
connector: connector:community/logging@1.0.0
action: error
# 2. Transform data
- id: format_order
type: transform
language: jq
expression: |
{
"id": .order_id,
"items": [.line_items[] | {sku: .sku, qty: .quantity}],
"total": .total_amount
}
# 3. Parallel processing
- id: validate_and_process
type: parallel
join: all
branches:
inventory:
- id: check_inventory
type: action
connector: connector:acme/inventory@1.0.0
action: checkAvailability
payment:
- id: process_payment
type: action
connector: connector:acme/stripe@2.0.0
action: capturePayment
shipping:
- id: calculate_shipping
type: action
connector: connector:acme/shippo@1.0.0
action: getRates
# 4. Conditional routing
- id: check_inventory_status
type: if
condition: "{{ steps.validate_and_process.branches.inventory.output.available }}"
then:
# 5. Loop through items
- id: fulfill_items
type: forEach
items: "{{ steps.format_order.result.items }}"
maxConcurrency: 3
steps:
- id: ship_item
type: action
connector: connector:acme/fulfillment@1.0.0
action: shipItem
input:
sku: "{{ item.sku }}"
quantity: "{{ item.qty }}"
else:
# 6. Switch routing by backorder policy
- id: route_backorder
type: switch
value: "{{ steps.fetch_order.output.backorderPolicy }}"
cases:
partial:
- id: partial_fulfillment
type: action
connector: connector:acme/fulfillment@1.0.0
action: partialShip
hold:
- id: wait_for_stock
type: wait
duration: 7d
cancel:
- id: cancel_order
type: action
connector: connector:acme/orders@1.0.0
action: cancelOrder
# 7. Wait for shipping confirmation
- id: wait_for_shipment
type: wait
until:
event: "shipment.confirmed"
timeout: 86400
channel: "webhook"
# 8. Handoff for complex issues
- id: check_if_escalation_needed
type: if
condition: "{{ steps.wait_for_shipment.timedOut }}"
then:
- id: escalate_to_agent
type: handoff
agent: agent:acme/fulfillment-specialist@1.0.0
reason: "Shipment delayed beyond SLA"
context:
order: "{{ steps.fetch_order.output }}"
shipmentStatus: "{{ steps.wait_for_shipment.output }}"
outputs:
orderStatus:
value: "{{ steps.fulfill_items.output.status }}"
trackingNumbers:
value: "{{ steps.fulfill_items.output.results }}"
Best Practices
- Use If for simple branching: Binary decisions
- Use Switch for multi-way routing: Multiple distinct paths
- Limit loop concurrency: Prevent resource exhaustion
- Set timeouts on waits: Always have a timeout
- Retry idempotent operations only: Ensure safe retries
- Transform close to data source: Minimize data movement
- Handoff with full context: Pass conversation history
- Error handling at boundaries: Wrap external calls in try/catch
Expression Language Reference
Operators
- Comparison:
>,<,>=,<=,==,!= - Logical:
&&,||,! - Arithmetic:
+,-,*,/,% - String:
+(concatenation)
Functions
length(array|string): Get lengthcontains(array|string, value): Check containmentstartsWith(string, prefix): String starts withendsWith(string, suffix): String ends withupper(string): Convert to uppercaselower(string): Convert to lowercasetrim(string): Remove whitespacesplit(string, delimiter): Split stringjoin(array, delimiter): Join arraymap(array, expression): Transform arrayfilter(array, condition): Filter arraysum(array): Sum numbersavg(array): Average numbersmin(array): Minimum valuemax(array): Maximum value
Context Access
{{ inputs.paramName }} # Workflow inputs
{{ steps.stepId.output.field }} # Step outputs
{{ steps.stepId.status }} # Step status
{{ item }} # Current loop item
{{ index }} # Current loop index
{{ error.message }} # Error details (in catch)
{{ env.VARIABLE }} # Environment variable
{{ secrets.SECRET_NAME }} # Secret value (trusted context only)
{{ trigger.payload }} # Trigger payload (webhook)
{{ trigger.event }} # Trigger event data
Validation Rules
- Step IDs must be unique within a workflow
- Condition expressions must be valid syntax
- Branch references must exist in switch cases
- Loop items must be arrays or array expressions
- Parallel branches must be named uniquely
- Wait durations must be valid time strings
- Transform expressions must be valid for selected language
- Agent references must be valid agent IDs
Next Steps
- Workflow Schema - Core workflow specification
- Agent Schema - Agent handoff targets
- Execution Backends - Runtime behavior