Skip to content

MapReduce Workflows

Quick Start

Want to get started with MapReduce? Here's a minimal working example:

name: my-first-mapreduce
mode: mapreduce

# Generate work items
setup:
  - shell: "echo '[{\"id\": 1, \"name\": \"task-1\"}, {\"id\": 2, \"name\": \"task-2\"}]' > items.json"

# Process items in parallel
map:
  input: "items.json"
  json_path: "$[*]"
  agent_template:
    - shell: "echo Processing ${item.name}"
  max_parallel: 5

# Aggregate results
reduce:
  - shell: "echo Completed ${map.successful}/${map.total} items"

Run it:

prodigy run workflow.yml

That's it! Now let's explore the full capabilities.

Complete Structure

name: parallel-processing
mode: mapreduce

# Optional setup phase
setup:
  - shell: "generate-work-items.sh"
  - shell: "debtmap analyze . --output items.json"

# Map phase: Process items in parallel
map:
  # Input source (JSON file or command)
  input: "items.json"

  # JSONPath expression to extract items
  json_path: "$.items[*]"

  # Agent template (commands run for each item)
  # Modern syntax: Commands directly under agent_template
  agent_template:
    - claude: "/process '${item}'"
    - shell: "test ${item.path}"
      on_failure:
        claude: "/fix-issue '${item}'"

  # DEPRECATED: Nested 'commands' syntax (still supported)
  # agent_template:
  #   commands:
  #     - claude: "/process '${item}'"

  # Maximum parallel agents (can use environment variables)
  max_parallel: 10  # or max_parallel: "$MAX_WORKERS"

  # Optional: Filter items
  filter: "item.score >= 5"

  # Optional: Sort items
  sort_by: "item.priority DESC"

  # Optional: Limit number of items
  max_items: 100

  # Optional: Skip items
  offset: 10

  # Optional: Deduplicate by field
  distinct: "item.id"

  # Optional: Agent timeout in seconds
  agent_timeout_secs: 300

  # Optional: Advanced timeout configuration (alternative to agent_timeout_secs)
  # timeout_config:
  #   default: "5m"
  #   per_command: "2m"

# Reduce phase: Aggregate results
# Modern syntax: Commands directly under reduce
reduce:
  - claude: "/summarize ${map.results}"
  - shell: "echo 'Processed ${map.successful}/${map.total} items'"

# DEPRECATED: Nested 'commands' syntax (still supported)
# reduce:
#   commands:
#     - claude: "/summarize ${map.results}"

# Optional: Custom merge workflow (supports two formats)
merge:
  # Simple array format
  - shell: "git fetch origin"
  - claude: "/merge-worktree ${merge.source_branch}"
  - shell: "cargo test"

# OR full format with timeout
# merge:
#   commands:
#     - shell: "git fetch origin"
#     - claude: "/merge-worktree ${merge.source_branch}"
#   timeout: 600  # Timeout in seconds

# Error handling policy
error_policy:
  on_item_failure: dlq  # dlq, retry, skip, stop, or custom handler name
  continue_on_failure: true
  max_failures: 5
  failure_threshold: 0.2  # 20% failure rate
  error_collection: aggregate  # aggregate, immediate, or batched:N

  # Circuit breaker configuration
  circuit_breaker:
    failure_threshold: 5      # Open circuit after N failures
    success_threshold: 2      # Close circuit after N successes
    timeout: "60s"           # Duration before attempting half-open (humantime format: "1s", "1m", "5m")
    half_open_requests: 3    # Test requests in half-open state

  # Retry configuration with backoff
  retry_config:
    max_attempts: 3
    # BackoffStrategy is an enum - use one variant:
    backoff:
      exponential:
        initial: "1s"
        multiplier: 2.0

# Convenience fields (alternative to nested error_policy)
# These top-level fields map to error_policy for simpler syntax
on_item_failure: dlq
continue_on_failure: true
max_failures: 5

Additional Topics

See also: - Environment Variables in Configuration - Backoff Strategies - Error Collection Strategies - Setup Phase (Advanced) - Global Storage Architecture - Event Tracking - Checkpoint and Resume - Dead Letter Queue (DLQ) - Workflow Format Comparison