MapReduce Workflows Overview¶
MapReduce workflows enable parallel processing of large datasets by distributing work items across isolated agents. This powerful execution mode allows you to process hundreds or thousands of items concurrently, making it ideal for bulk operations, data migrations, and large-scale code transformations.
What is MapReduce Mode?¶
MapReduce is an execution mode in Prodigy that follows the classic MapReduce pattern:
- Setup Phase (optional): Prepare work items and initialize the environment
- Map Phase: Process work items in parallel across isolated agents
- Reduce Phase: Aggregate results and perform final operations
Each phase executes in an isolated git worktree, ensuring your main repository remains untouched during execution. Agent results are automatically merged, and the final changes are presented for your approval before merging to your original branch.
When to Use MapReduce¶
Use MapReduce workflows when:
- Processing 10+ independent items (e.g., updating copyright headers in 500 files)
- Each item takes significant time (e.g., running linting on each module)
- Operations can be parallelized safely (no shared state dependencies)
- You need fault isolation per item (one failure doesn't block others)
Use standard workflows for:
- Sequential operations that depend on previous results
- Single-item processing
- Workflows requiring shared mutable state
- Operations that must execute in a specific order
Performance Sweet Spot
MapReduce excels when you have 10-1000 work items that each take 10 seconds to 5 minutes to process. For smaller workloads, the overhead of parallelization may not be worthwhile.
The Three Phases¶
graph TD
Start[Workflow Start] --> Setup[Setup Phase<br/>Generate work items]
Setup --> Extract[Extract Items<br/>Apply filter, sort]
Extract --> Map[Map Phase<br/>Parallel Processing]
Map --> A1[Agent 1<br/>Item 0]
Map --> A2[Agent 2<br/>Item 1]
Map --> A3[Agent 3<br/>Item 2]
Map --> AN[Agent N<br/>Item N]
A1 -->|Success| Merge[Merge Results]
A2 -->|Success| Merge
A3 -->|Success| Merge
AN -->|Success| Merge
A1 -->|Failure| DLQ[Dead Letter Queue]
A2 -->|Failure| DLQ
A3 -->|Failure| DLQ
AN -->|Failure| DLQ
Merge --> Reduce[Reduce Phase<br/>Aggregate & Report]
Reduce --> Complete[Workflow Complete]
DLQ -.->|Retry| Map
style Setup fill:#e1f5ff
style Map fill:#fff3e0
style Reduce fill:#f3e5f5
style DLQ fill:#ffebee
style Complete fill:#e8f5e9
Figure: MapReduce execution flow showing the three phases, parallel agent processing, and error handling through the Dead Letter Queue.
Setup Phase¶
The optional setup phase prepares your environment and generates work items:
# Source: workflows/mapreduce-example.yml:5-6
setup:
- shell: "debtmap analyze . --output debt_items.json"
Key capabilities:
- Execute commands before parallel processing begins
- Generate JSON files containing work items
- Capture outputs for use in later phases
- Initialize databases, fetch data, or prepare resources
The setup phase runs once in the MapReduce worktree and creates a checkpoint for resume.
Map Phase¶
The map phase distributes work items to parallel agents:
# Source: workflows/mapreduce-example.yml:9-27
map:
input: debt_items.json # (1)!
json_path: "$.debt_items[*]" # (2)!
agent_template: # (3)!
- claude: "/fix-issue ${item.description} --file ${item.location.file}"
- shell: "cargo test"
on_failure: # (4)!
claude: "/debug-test ${shell.output}"
max_parallel: 10 # (5)!
filter: "severity == 'high' || severity == 'critical'" # (6)!
sort_by: "priority" # (7)!
1. JSON file containing work items (generated in setup phase)
2. JSONPath expression to extract items from the JSON structure
3. Template commands executed by each agent for its work item
4. Fallback commands executed if the previous command fails
5. Maximum number of agents running concurrently (controls parallelism)
6. Only process items matching this condition (reduces workload)
7. Process items in this order (e.g., high-priority items first)
Key capabilities:
- JSONPath extraction: Extract work items from JSON files
- Parallel execution: Run multiple agents concurrently (controlled by
max_parallel) - Agent isolation: Each agent runs in its own git worktree
- Filtering: Process only items matching criteria
- Sorting: Control processing order
- Error handling: Failed items route to Dead Letter Queue (DLQ)
Each agent executes the agent_template commands with access to ${item} variables. Agents commit their changes and merge back to the MapReduce worktree automatically.
Reduce Phase¶
The reduce phase aggregates results after all map agents complete:
# Source: workflows/mapreduce-example.yml:30-39
reduce:
- claude: "/summarize-fixes ${map.results}"
capture_output: true
- shell: "echo 'Fixed ${map.successful}/${map.total} items'"
- claude: "/generate-report --fixed ${map.successful} --failed ${map.failed}"
Key capabilities:
- Result aggregation: Access to
${map.results},${map.successful},${map.failed},${map.total} - Sequential execution: Commands run in order
- Report generation: Summarize outcomes, generate artifacts
- Validation: Verify all items were processed correctly
The reduce phase runs once in the MapReduce worktree with full visibility into map phase results.
Work Items and Distribution¶
What are Work Items?¶
Work items are individual units of work extracted from JSON files. Each item is processed by exactly one agent in the map phase.
Example work items:
[
{"id": 1, "file": "src/main.rs", "issue": "unused import"},
{"id": 2, "file": "src/lib.rs", "issue": "missing docs"},
{"id": 3, "file": "tests/test.rs", "issue": "deprecated API"}
]
Work Distribution Features¶
JSONPath Extraction:
Filtering:
Sorting:
Pagination:
Deduplication:
Parallel Execution and Isolation¶
How Parallel Execution Works¶
When the map phase begins, Prodigy:
- Extracts work items from the input JSON
- Applies filtering, sorting, and deduplication
- Creates a pool of agent slots (limited by
max_parallel) - Assigns work items to available agents
- Each agent runs in an isolated git worktree
- Agents execute independently and commit their changes
- Completed agents merge back to the MapReduce worktree
- Failed items are routed to the Dead Letter Queue
Agent Isolation¶
Each agent runs in its own git worktree branched from the MapReduce worktree:
graph TD
Main[Original Branch<br/>main/feature-xyz] --> Parent[MapReduce Worktree<br/>session-abc123]
Parent --> A1[Agent Worktree 1<br/>agent-1-item-1]
Parent --> A2[Agent Worktree 2<br/>agent-2-item-2]
Parent --> A3[Agent Worktree 3<br/>agent-3-item-3]
Parent --> AN[Agent Worktree N<br/>agent-N-item-N]
A1 -->|Merge Changes| Parent
A2 -->|Merge Changes| Parent
A3 -->|Merge Changes| Parent
AN -->|Merge Changes| Parent
Parent -.->|User Approval| Main
style Main fill:#e8f5e9
style Parent fill:#e1f5ff
style A1 fill:#fff3e0
style A2 fill:#fff3e0
style A3 fill:#fff3e0
style AN fill:#fff3e0
Figure: Worktree isolation architecture showing how agents branch from the parent MapReduce worktree and merge their changes back independently.
Benefits of isolation:
- No conflicts: Agents can't interfere with each other
- Independent failures: One agent failure doesn't affect others
- Clean merges: Each agent's changes merge independently
- Resource safety: No shared mutable state
Cleanup Failure Handling
If worktree cleanup fails after agent completion, the agent's work is still preserved. Failed cleanups are tracked in an orphaned worktree registry and can be retried with:
See Spec 136 for details.Controlling Parallelism¶
# Static parallelism
max_parallel: 10
# Dynamic parallelism from environment
max_parallel: "$MAX_WORKERS"
Resource Limits
Each agent consumes system resources (CPU, memory, disk I/O). Start with conservative max_parallel values and increase based on system capacity. A good starting point is 5-10 agents.
Example: Bulk Code Updates¶
When to Use This Pattern
This example demonstrates a common MapReduce use case: applying the same transformation to many files independently. Use this pattern for bulk updates like copyright headers, import statements, formatting changes, or license headers where each file can be processed in isolation.
Here's a complete example updating copyright headers across a codebase:
# Source: workflows/mapreduce-example.yml (adapted)
name: update-copyright-headers
mode: mapreduce
setup:
- shell: |
# Generate list of source files
find src -name "*.rs" -type f | jq -R -s -c 'split("\n")[:-1] | map({file: .})' > files.json
map:
input: files.json
json_path: "$[*]"
max_parallel: 20
agent_template:
- shell: |
# Update copyright header
sed -i '1s/Copyright 2023/Copyright 2024/' ${item.file}
git add ${item.file}
git commit -m "Update copyright in ${item.file}"
reduce:
- shell: |
echo "Updated copyright headers in ${map.successful}/${map.total} files"
- shell: |
if [ ${map.failed} -gt 0 ]; then
echo "Failed files: ${map.failed}"
echo "Check DLQ: prodigy dlq show <job-id>"
fi
Error Handling and Recovery¶
Dead Letter Queue (DLQ)¶
Failed work items are automatically sent to the DLQ with:
- Original work item data
- Failure reason and error message
- Timestamp and retry count
- Claude JSON log location for debugging (via
json_log_locationfield)
Debugging with JSON Logs
Each DLQ item includes a json_log_location field pointing to the Claude execution log. Use this to inspect the complete conversation, tool invocations, and error context:
Retry failed items:
Checkpoint and Resume¶
MapReduce workflows create checkpoints after each phase:
- Setup checkpoint: Setup phase results and artifacts
- Map checkpoint: Work item states and agent results
- Reduce checkpoint: Completed reduce steps and variables
Resume an interrupted workflow:
All completed work is preserved. In-progress items restart from the beginning.
Concurrent Resume Protection
Resume operations are protected from concurrent execution using automatic lock management. If another process is already resuming the same job, you'll receive an error with details about the lock holder. Stale locks (from crashed processes) are automatically detected and cleaned up. See Spec 140 for details.
Learn More¶
Ready to dive deeper? Explore these topics:
- Work Distribution - JSONPath, filtering, sorting, and deduplication
- Checkpoint and Resume - State management and recovery
- Dead Letter Queue (DLQ) - Error handling and retry strategies
- Event Tracking - Monitoring and debugging with event logs
- Environment Variables - Parameterization and secrets
- Global Storage Architecture - Cross-repository state management
Next Steps¶
- Try the Quick Start - Run the minimal example in MapReduce Workflows
- Understand Your Use Case - Identify work items and parallelization opportunities
- Start Small - Begin with
max_parallel: 5and a subset of items - Monitor and Tune - Use event tracking and DLQ to optimize performance
- Scale Up - Increase parallelism based on system capacity
Best Practice
Always test MapReduce workflows with a small subset of items first (using max_items: 10) before running on the full dataset. This helps identify issues early and validates your workflow logic.