System Design Notes All designs

Storage & Media

YouTube Top-K Videos

Query the top K most-viewed videos (K ≤ 1000) for fixed sliding windows — minute, hour, day, month, all-time. A ~1M views/sec firehose flows through Kafka into counter shards that keep exact counts plus a size-K min-heap per window. The defining constraint is "no approximations" — the authoritative path must be exact.

Requirements

Functional

Non-functional

Scale & back-of-the-envelope

API design

GET /views/top?k={K}&window={minute|hour|day|month|all-time}
-> { "window":"day", "asOf":"...", "results":[ {"videoId":"abc","views":98342111}, ... ] }

# k clamped to [1,1000]; window is a fixed enum (no start/end params)
# Cache-Control: max-age=5  (still satisfies <1 min freshness)

Views are produced to Kafka (partitioned by videoId) rather than POSTed synchronously, to absorb 1M/s.

High-level design

Client → load balancer → Top-K Service. The View Stream (Kafka) feeds Counter shards (each holding per-window counts + a top-K heap); Zookeeper coordinates shard ownership; checkpointing to blob storage persists shard state.

flowchart LR
    Client(["Client"]) --> LB["Load Balancer"]
    LB --> TKS["Top-K Service"]
    Kafka["View Stream (Kafka)"]
    subgraph Shard1["Counter Shard A"]
        HC1["Hour Counts"] --> HH1["Hour Top-K Heap"]
        DC1["Day Counts"] --> DH1["Day Top-K Heap"]
    end
    subgraph Shard2["Counter Shard B"]
        HC2["Hour Counts"] --> HH2["Hour Top-K Heap"]
        DC2["Day Counts"] --> DH2["Day Top-K Heap"]
    end
    Kafka --> HC1
    Kafka --> DC1
    Kafka --> HC2
    Kafka --> DC2
    TKS --> Shard1
    TKS --> Shard2
    TKS --> ZK["Zookeeper"]
    Shard1 --> CP["Checkpointing (Blob Storage)"]
    Shard2 --> CP
      

Exact (full integer counts + videoId partitioning keeps each video's count whole on one shard), fresh (streaming updates apply in ms), fast reads (in-memory heaps), and scalable (add partitions + shards).

Deep dive · counting with a size-K min-heap

Each shard keeps, per window, a Counts map (exact integers) and a min-heap of size K. The heap root is the smallest of the current top-K — the admission threshold. Most views are for non-top videos and cost only an O(1) map increment.

flowchart TD
    V["View event: videoId"] --> R{"Hash videoId to shard"}
    R --> CMap["Per-window Counts: videoId to count"]
    CMap --> Inc["Increment exact count"]
    Inc --> T{"New count exceeds heap-min?"}
    T -- "yes" --> Upd["Update Top-K min-heap, re-heapify"]
    T -- "no" --> Skip["Skip heap update (hot path)"]
    Upd --> CP["Periodic checkpoint"]
    Skip --> CP
      

Updates are O(log K) (with a videoId → heapIndex side map for in-place key updates); reads are O(K). The heap is purely a read-side index over the exact counts so the service never scans the whole map.

Deep dive · sharding & cross-shard merge

Partition by videoId (Kafka partition key). Every view for a video routes to the same shard → that shard holds the video's complete, exact count. Global top-K = merge of each shard's local top-K (correct because a global top-K video must be top-K on its own shard).

sequenceDiagram
    participant C as Client
    participant API as TopK Service
    participant ZK as Zookeeper
    participant S as Counter Shards
    participant K as Kafka Views
    K->>S: stream views, partitioned by videoId
    S->>S: update exact counts and per-window heaps
    C->>API: GET top-K, window=day
    API->>ZK: discover shards for window
    API->>S: scatter query to all shards
    S-->>API: each returns local top-K
    API-->>C: merge to global top-K
      
Partition strategy Effect
By videoId (chosen) Exact per-video counts; simple merge; risk of a hot shard for viral videos
By view-time / round-robin Even load, but a video's count is split → can't get exact top-K without global aggregation

Hot-key mitigation: split a viral key into videoId#0..#m sub-counters across shards and sum on read; or promote ultra-hot keys to a small replicated aggregator.

Deep dive · multi-window aggregation

Maintaining five independent counters is wasteful. Keep per-minute buckets and roll up: hour = sum of last 60, day = last 1440, month = last 43,200. Sliding works by advancing the ring and evicting buckets older than the window; all-time is a single monotonic counter served from the durable/batch store.

flowchart LR
    subgraph Mins["Per-minute count buckets"]
        B1["bucket t-59"]
        B2["bucket t-1"]
        B3["bucket t"]
    end
    B1 --> Hr["Hour rollup: sum last 60"]
    B2 --> Hr
    B3 --> Hr
    Hr --> HrHeap["Hour Top-K Heap"]
    B3 --> Day["Day rollup: sum last 1440"]
    Day --> DayHeap["Day Top-K Heap"]
    Evict["Evict buckets older than window"] --> Mins
      

Recompute each window's heap on its natural tick (hour heap each minute-rollup) — this bounds heap maintenance independently of the 1M/s ingest rate (still < 1 min stale).

Deep dive · why not Count-Min Sketch?

Classic heavy-hitters use a Count-Min Sketch (fixed sub-linear memory) + a heap. But CMS counts are over-estimates (collisions only add), so ordering near the K-th boundary can be wrong — an approximation. Because the requirement says "no approximations," CMS cannot be the source of truth here.

Path Memory Accuracy Use
Exact map + heap (chosen) O(active videos)/window/shard Exact Authoritative
CMS + heap O(d·w) fixed Approximate (over-count) Rejected as truth; optional pre-filter to decide which keys to track exactly

Deep dive · fault tolerance & reconciliation

Counter shards are stateful, so losing one must not lose counts.

Scaling summary

Scale on two axes kept in lockstep — Kafka partitions (ingest) and counter shards (state) — to ~80% utilization. Reads scale via small in-memory heaps + a few-second cache. Exactness & durability come from videoId partitioning, atomic checkpoint+offset snapshots, and optional batch reconciliation.