Storage & Media
YouTube Top-K Videos
Query the top K most-viewed videos (K ≤ 1000) for fixed sliding windows — minute, hour, day, month, all-time. A ~1M views/sec firehose flows through Kafka into counter shards that keep exact counts plus a size-K min-heap per window. The defining constraint is "no approximations" — the authoritative path must be exact.
Requirements
Functional
-
Query the top-K (K ≤ 1000) most-viewed videos for a
window∈ {minute, hour, day, month, all-time}. - Windows are sliding (hour = last 60 min, advancing); no arbitrary start/end — only fixed durations ending "now." This is what makes it tractable.
Non-functional
- Freshness < 1 minute; read latency 10–100 ms.
- ~1M views/sec; billions of videos; exact results (no approximation); HA, scalable, fault-tolerant.
Scale & back-of-the-envelope
- Catalog: 1M videos/day × 365 × 10 yr ≈ 3.6B videos.
- Ingest: 100B views/day ÷ 100k s/day ≈ 1M views/sec; provision to ~1.25M/s (80% util, 20% headroom).
- Writes dominate by orders of magnitude vs reads → optimize ingest throughput; keep reads cheap via small in-memory heaps + short-TTL caching.
API design
GET /views/top?k={K}&window={minute|hour|day|month|all-time}
-> { "window":"day", "asOf":"...", "results":[ {"videoId":"abc","views":98342111}, ... ] }
# k clamped to [1,1000]; window is a fixed enum (no start/end params)
# Cache-Control: max-age=5 (still satisfies <1 min freshness)
Views are produced to Kafka (partitioned by
videoId) rather than POSTed synchronously, to absorb
1M/s.
High-level design
Client → load balancer → Top-K Service. The View Stream (Kafka) feeds Counter shards (each holding per-window counts + a top-K heap); Zookeeper coordinates shard ownership; checkpointing to blob storage persists shard state.
flowchart LR
Client(["Client"]) --> LB["Load Balancer"]
LB --> TKS["Top-K Service"]
Kafka["View Stream (Kafka)"]
subgraph Shard1["Counter Shard A"]
HC1["Hour Counts"] --> HH1["Hour Top-K Heap"]
DC1["Day Counts"] --> DH1["Day Top-K Heap"]
end
subgraph Shard2["Counter Shard B"]
HC2["Hour Counts"] --> HH2["Hour Top-K Heap"]
DC2["Day Counts"] --> DH2["Day Top-K Heap"]
end
Kafka --> HC1
Kafka --> DC1
Kafka --> HC2
Kafka --> DC2
TKS --> Shard1
TKS --> Shard2
TKS --> ZK["Zookeeper"]
Shard1 --> CP["Checkpointing (Blob Storage)"]
Shard2 --> CP
Exact (full integer counts + videoId partitioning keeps
each video's count whole on one shard), fresh (streaming updates apply
in ms), fast reads (in-memory heaps), and scalable (add partitions +
shards).
Deep dive · counting with a size-K min-heap
Each shard keeps, per window, a Counts map (exact
integers) and a min-heap of size K. The heap root is
the smallest of the current top-K — the
admission threshold. Most views are for non-top
videos and cost only an O(1) map increment.
flowchart TD
V["View event: videoId"] --> R{"Hash videoId to shard"}
R --> CMap["Per-window Counts: videoId to count"]
CMap --> Inc["Increment exact count"]
Inc --> T{"New count exceeds heap-min?"}
T -- "yes" --> Upd["Update Top-K min-heap, re-heapify"]
T -- "no" --> Skip["Skip heap update (hot path)"]
Upd --> CP["Periodic checkpoint"]
Skip --> CP
Updates are O(log K) (with a
videoId → heapIndex side map for in-place key updates);
reads are O(K). The heap is purely a read-side index over
the exact counts so the service never scans the whole map.
Deep dive · multi-window aggregation
Maintaining five independent counters is wasteful. Keep per-minute buckets and roll up: hour = sum of last 60, day = last 1440, month = last 43,200. Sliding works by advancing the ring and evicting buckets older than the window; all-time is a single monotonic counter served from the durable/batch store.
flowchart LR
subgraph Mins["Per-minute count buckets"]
B1["bucket t-59"]
B2["bucket t-1"]
B3["bucket t"]
end
B1 --> Hr["Hour rollup: sum last 60"]
B2 --> Hr
B3 --> Hr
Hr --> HrHeap["Hour Top-K Heap"]
B3 --> Day["Day rollup: sum last 1440"]
Day --> DayHeap["Day Top-K Heap"]
Evict["Evict buckets older than window"] --> Mins
Recompute each window's heap on its natural tick (hour heap each minute-rollup) — this bounds heap maintenance independently of the 1M/s ingest rate (still < 1 min stale).
Deep dive · why not Count-Min Sketch?
Classic heavy-hitters use a Count-Min Sketch (fixed sub-linear memory) + a heap. But CMS counts are over-estimates (collisions only add), so ordering near the K-th boundary can be wrong — an approximation. Because the requirement says "no approximations," CMS cannot be the source of truth here.
| Path | Memory | Accuracy | Use |
|---|---|---|---|
| Exact map + heap (chosen) | O(active videos)/window/shard | Exact | Authoritative |
| CMS + heap | O(d·w) fixed | Approximate (over-count) | Rejected as truth; optional pre-filter to decide which keys to track exactly |
Deep dive · fault tolerance & reconciliation
Counter shards are stateful, so losing one must not lose counts.
-
Checkpointing: each shard atomically snapshots
{ counts, heaps, committed Kafka offset }; on restart, restore + replay Kafka from the saved offset. Persisting the offset with the counts makes recovery deterministic (no double counting). - Zookeeper owns shard ↔ partition assignment, leader election, membership, and discovery; it drives rebalancing on node add/remove.
- Lambda layer: archive raw views to a data lake; a periodic MapReduce/Spark job recomputes exact top-K for closed windows + all-time. The service serves sealed batch results for closed windows + real-time deltas for the open window.
Scaling summary
Scale on two axes kept in lockstep — Kafka partitions (ingest) and
counter shards (state) — to ~80% utilization. Reads scale via small
in-memory heaps + a few-second cache. Exactness & durability
come from videoId partitioning, atomic
checkpoint+offset snapshots, and optional batch reconciliation.