System Design Notes All designs

AI / ML Infrastructure

Petabyte-scale ML Training Data Pipeline

A frontier training run rents tens of thousands of GPUs by the hour, and every one of them is worthless the instant it sits idle waiting for the next batch. The data pipeline’s only job is to make sure that never happens — to take a petabyte of messy raw data, grind it offline into clean, tokenized, sharded records, and then stream batches onto the GPUs fast enough to keep the silicon at 100% utilization, deterministically, and resumably, for weeks on end. This page is the story of that fight against GPU starvation: the offline ETL that prepares the corpus, the online dataloader that hides every microsecond of I/O behind compute, the surprisingly hard problem of shuffling at petabyte scale, and the bottlenecks that decide whether your $-per-GPU-hour buys real training or expensive waiting.

Requirements

The pipeline is a producer and the training loop is a consumer. Success is binary and brutal: if the consumer ever blocks on the producer, GPUs starve and money burns. An idle 100k-GPU fleet at a few dollars per GPU-hour wastes on the order of hundreds of thousands of dollars per hour, so the entire design optimizes for one number — keeping the dataloader strictly ahead of the GPUs.

Functional Non-functional
Ingest raw data — pull petabytes of heterogeneous data (web dumps, code, images, logs) from object storage and warehouses into a processable form. Saturate GPU throughput — deliver batches at a rate that holds GPU utilization near 100%; the dataloader must never be on the critical path of a training step.
Clean, tokenize, transform — normalize, dedup, quality-filter, tokenize text / decode images, and pack samples into model-ready tensors. Deterministic & reproducible — the same seed yields the exact same sample order, so a run is bit-for-bit replayable for debugging and resume.
Shard & shuffle — split the corpus into many independent shards and randomize sample order each epoch to avoid correlated batches. Handle petabytes — the corpus never fits on one machine or even one rack; everything is streamed from object storage, never fully materialized locally.
Stream batches to trainers — feed each of thousands of data-parallel ranks its own non-overlapping slice with prefetch and overlap. Elastic — the number of ranks can change (preemption, scale-up, failures) and the pipeline re-shards and continues without re-reading consumed data.
Resumable — on restart, resume from the exact sample cursor (epoch + shard + offset), not from the start of the epoch. Cost-efficient & fault-tolerant — offline cost amortized across many runs; online path survives slow shards, stragglers, and transient object-store errors.

The one invariant

Define data-stall as the fraction of step time the GPU spends waiting for input. The whole pipeline exists to drive data-stall toward zero. Every technique below — offline precomputation, sharding, prefetch, pinned-memory H2D overlap, local NVMe caching — is a way to keep the next batch ready before the GPU asks for it.

Scale & back-of-envelope math

Start from the GPUs and work backwards: their aggregate appetite for samples is the throughput the dataloader must sustain. If the fleet chews through X samples/sec, the pipeline has to produce at least X samples/sec, indefinitely, with margin.

Quantity Estimate How we get there
Corpus size ~5 PB raw → ~1 PB after filtering Web-scale crawl + code + multimodal; aggressive dedup and quality filtering throw away the majority of bytes.
Tokens ~15 trillion tokens A modern LLM corpus; at ~4 bytes/token of tokenized output that is 15e12 × 4 ≈ 60 TB of tokenized data (far smaller than the raw input).
Bytes / sample ~8 KB (text seq) — MBs (image) A 4K-token packed sequence in int32 ids is 4096 × 4 ≈ 16 KB; stored as uint16 token ids it halves to ~8 KB.
Required dataloader throughput ~5–10 M tokens/sec (cluster) 100k GPUs at, say, ~50–100 tokens/sec/GPU of training throughput ⇒ ~5e6–1e7 tokens/sec, i.e. roughly ~1,500–2,500 packed 4K-sequences/sec.
Storage read bandwidth ~40–80 GB/s (tokenized) 10e6 tokens/s × ~4 bytes ≈ 40 GB/s from object store across all ranks — modest because heavy reduction happened offline; raw bytes would be 50× worse.
Preprocessing CPU cost ~105–106 core-hours (once) Tokenizing + dedup over PBs is a massive one-time batch job; amortized across every training run that reuses the dataset, so cost-per-run is small.

Why offline preprocessing is the whole trick

Note the 50× gap between raw read bandwidth and tokenized read bandwidth. If we tokenized and filtered on the fly during training, we would need to read ~1–5 PB repeatedly and burn CPU on every epoch — impossible to keep ahead of 100k GPUs. By paying the cost once, offline, the online path only moves small, pre-digested records, and CPU per step drops to cheap decode + collate. Move work out of the hot loop.

Offline preprocessing (the ETL)

The offline stage is a classic big-data batch job — run on Spark, Ray Data, or Apache Beam over thousands of CPU cores — that turns raw bytes into training-ready shards. It runs rarely (once per dataset version), so it optimizes for throughput and quality, not latency, and its output is immutable and cached for every run that follows.

flowchart TD
  RAW["Raw corpus in object store (PB scale)"] --> ING["Ingest: web dumps, logs, images"]
  ING --> CLEAN["Clean + normalize (Spark / Ray / Beam)"]
  CLEAN --> DEDUP["Dedup (MinHash + exact hash)"]
  DEDUP --> FILTER["Quality filter + PII / toxicity removal"]
  FILTER --> TOK["Tokenize (BPE / SentencePiece)"]
  TOK --> PACK["Pack samples into fixed-length sequences"]
  PACK --> WRITE["Write sharded records (WebDataset / TFRecord / Parquet)"]
  WRITE --> STORE["Sharded dataset back in object store"]
  STORE --> IDX["Build shard index + manifest + stats"]
  IDX --> READY["Ready for training"]
      

The offline pipeline reduces petabytes of raw input to a compact, sharded, tokenized dataset plus a manifest — run once, reused by every training job.

Version everything

The output dataset is content-addressed and versioned (dataset v3.2, tokenizer v7). Reproducibility demands that “train on dataset X” always means the exact same bytes; mutating a dataset in place silently breaks every claim of determinism downstream.

Online dataloading

The online path is where starvation is won or lost. Each data-parallel rank runs a dataloader of CPU worker processes that stream shards from object storage, decode and transform samples, collate them into a batch tensor, and hand it to the GPU — all while the GPU is busy on the previous batch. The design goal is that the GPU never observes any of this work.

sequenceDiagram
  participant OS as Object store
  participant W as CPU workers
  participant Q as Prefetch queue
  participant P as Pinned buffer
  participant G as GPU
  W->>OS: Request next shard for this rank
  OS-->>W: Stream samples
  W->>W: Decode + augment + collate batch
  W->>Q: Push ready batch N+1
  Q->>P: Stage batch into pinned memory
  P-->>G: Async H2D copy on side stream
  G->>G: Train step on batch N
  Note over W,Q: Workers prefetch N+1 while GPU computes N
  G-->>Q: Signal slot free (backpressure)
      

Prefetch, transform, and host-to-device copy of batch N+1 all overlap the GPU compute of batch N; a bounded queue applies backpressure so workers stay exactly one step ahead.

The overlap budget

If a GPU step takes T_gpu and producing a batch takes T_data, the loop is GPU-bound (the good case) only while T_data <= T_gpu. Prefetch hides T_data entirely until it exceeds T_gpu — at which point the GPU waits and data-stall goes positive. Every online technique is about keeping T_data comfortably under the step time, with margin for jitter and the occasional slow shard.

Deep dive: shuffling at scale

SGD assumes each batch is a roughly i.i.d. sample of the data. If batches are correlated — all from one source, one topic, one time window — gradients are biased and training destabilizes. So we must shuffle. The catch: a true global shuffle of a petabyte means randomly reading samples from everywhere, which is a random-I/O nightmare for object storage. The art is approximating a global shuffle while keeping reads sequential.

flowchart LR
  SL["Global shard list"] --> PERM["Permute by seed = f(epoch)"]
  PERM --> A0["Rank 0 shard subset"]
  PERM --> A1["Rank 1 shard subset"]
  PERM --> AN["Rank N shard subset"]
  A0 --> BUF["Local shuffle buffer (size B samples)"]
  BUF --> PICK["Pick random slot, refill from shard"]
  PICK --> BATCH["Emit randomized batch"]
  BATCH --> TR["Trainer rank 0"]
  CUR["Cursor: epoch + shard + offset"] -.-> BUF
  CUR -.-> CKPT["Checkpoint for mid-epoch resume"]
      

Two-level shuffle: deterministically permute shard assignment each epoch (coarse), then draw samples through a bounded in-memory buffer (fine). The cursor makes it all resumable.

Level Mechanism Randomness Cost
Global shuffle Randomly permute all samples each epoch (e.g. a Spark-side reshuffle into new shards). Ideal — fully decorrelated batches. Very high — random I/O or a full rewrite of the dataset per epoch. Rarely affordable at PB scale.
Shard-level (coarse) Permute the order of shards and which rank gets which, keyed by a per-epoch seed. Good across shards; cheap. Combined with packing it decorrelates most structure. Near-zero — just reorders large sequential reads.
Buffer-level (fine) Stream into a shuffle buffer of B samples; emit a random slot, refill from the stream. Local randomness within a sliding window of size B. B × sample_size of RAM; bigger buffer ⇒ better mixing.

Production systems combine the two cheap levels: shuffle shard assignment per epoch (coarse) and shuffle samples within a buffer (fine). With shards already containing packed, deduped, source-mixed sequences, this two-level scheme is statistically close to a global shuffle at a tiny fraction of the I/O cost.

Determinism vs throughput tension

Strict global determinism can fight performance: it forbids consuming whichever shard arrives first and pins exact ordering. Most teams choose per-rank determinism (each rank’s stream is reproducible) while allowing cross-rank arrival order to float, recovering throughput without giving up replayability where it matters.

Bottlenecks & scaling

At petabyte / 100k-GPU scale, the failure modes are predictable. Each one manifests as the same symptom — GPU utilization drops — so the discipline is to instrument data-stall per rank and attack whichever resource is saturated.

Bottleneck Why it bites Mitigation
Storage read bandwidth Thousands of ranks hammering the same object store can exceed its per-prefix or aggregate throughput, throttling reads. Large sequential shards; spread keys across many prefixes; local NVMe cache for re-reads; co-locate data near compute; compress shards.
CPU preprocessing bound If decode / augment / collate per batch costs more CPU than available, T_data > T_gpu and the GPU stalls. Push heavy work offline; add CPU workers; use fast codecs; offload decode/augment to GPU (DALI); store pre-tokenized records.
Shuffle randomness vs cost Bigger shuffle buffers and finer shuffles improve convergence but cost RAM and random I/O. Two-level shuffle; size buffer B to the quality/RAM sweet spot; pre-mix sources at write time so cheap shuffles suffice.
Stragglers A synchronous all-reduce step runs at the speed of the slowest rank; one slow shard or noisy node stalls the entire fleet. Deep prefetch to absorb jitter; timeouts + retry on alternate replicas; replicate hot shards; drop/skip a pathological shard rather than block.
Small-file problem Millions of tiny per-sample files mean one slow metadata/GET round-trip per sample — latency-bound and catastrophic for object stores. Pack thousands of samples per shard (WebDataset tar / TFRecord); target ~256 MB–1 GB shards; one open streams many samples.

Summary

Feeding 100k GPUs is an exercise in moving work out of the hot loop and hiding the rest behind compute. Do the expensive, one-time grind — clean, dedup, filter, tokenize, pack — offline, and write big sharded records so the online path only streams small, pre-digested bytes. On the hot path, give every rank a disjoint shard slice, prefetch with many CPU workers, collate into pinned memory for async H2D overlap, and cache hot shards on local NVMe. Approximate a global shuffle with a cheap two-level scheme, and make the whole stream deterministic and resumable via a seeded plan and a persisted cursor. Get those right and data-stall stays near zero — which, at this price per GPU-hour, is the entire game.