Sharded Counters
Decompose a hot counter into N shards and aggregate on read — the canonical fix for write hotspots.
Use cases#
A single integer counter is the simplest data structure imaginable — and one of the worst-scaling. Every increment is a write to the same row, the same key, the same shard. Concurrent increments serialize through whatever lock protects that one storage slot.
Sharded counters decompose that one counter into N independent counters and aggregate on read. Use cases:
- Like / heart / view counts on viral content. A tweet with 1M likes can absorb 100k+ increments/sec — impossible on a single row.
- Page view counts on a homepage taking 10k QPS.
- Real-time leaderboards where a top item gets thousands of writes/sec.
- Ad impressions, video views on hot content (
#1 trending videoon YouTube). - Rate limiter aggregate buckets across many keys.
If your counter is “increment-only” or “increment-and-decrement, no exact precision required at the millisecond”, sharded counters are the right answer. If you need a precise atomic compare-and-set (bank balance, inventory level), you need a different pattern (transactions, optimistic locking).
Functional requirements#
increment(counter_id, delta)— atomic increment of the logical counter.read(counter_id)— return the current total (sum of shards).- Optional:
decrement,reset, range queries (count between t1 and t2). - Optional: per-dimension counts (
count by country,count by hour).
Non-functional requirements#
- Write throughput: scale linearly with shard count. A single row hits ~1k writes/sec on commodity DB; 100 shards → 100k writes/sec.
- Read latency: a read aggregates N shards. For N = 100, fan-out cost matters — keep reads cached.
- Consistency: reads are eventually consistent — they reflect the sum of shards at some recent moment, not a globally serializable snapshot.
- Accuracy: count is exact in steady state. During concurrent increments, an in-flight read can miss in-flight increments — almost always acceptable.
High-level design#
┌── counter:tweet_42:shard_0 ──> 12,340 ├── counter:tweet_42:shard_1 ──> 12,287 increment ──> hash(client_id) % N ──> one shard ... ├── counter:tweet_42:shard_98 ──> 12,401 └── counter:tweet_42:shard_99 ──> 12,355
read ──> sum(SELECT value FROM counter:tweet_42:shard_*)
optional: cache the sum, refresh every 1-5 sEach increment lands on one of N shards based on hash(some_random_or_partition_key) % N. Each shard has its own row; per-shard increments are uncontended. Reads sum across shards.
Detailed design#
Choosing shard count#
N = 1 — original problem; useless.N = 10 — 10× write capacity; reads still trivially fast.N = 100 — handles tens of thousands of writes/sec on Postgres; read fan-out 100 still <10 ms.N = 1000 — write-heavy at scale; read latency starts to matter; consider read-side caching.N = 10000 — extreme write scale; reads are expensive; use approximation (HLL, count-min).Pick N = 10× the QPS you want to support divided by per-row write capacity (~1000/s on Postgres, ~10000/s on Redis, ~100/s on a single Cassandra row). It’s better to over-shard than under-shard at design time — adding shards later is hard.
Picking a shard per increment#
int pick_shard(string counter_id) { // Option 1: pure random — best write distribution, no key locality return random() % N;
// Option 2: hash of client / session — same client always hits same shard // (useful if you also want per-client rate limits in the same shard) return hash(client_id) % N;
// Option 3: time-bucketed — shard rotates over time; helps if you need // recent-window queries efficiently return ((current_minute() + hash(counter_id)) % N);}Pure random gives the smoothest write distribution. Hash-by-client gives stickier locality at the cost of slightly uneven distribution.
Read aggregation#
SELECT SUM(value) FROM countersWHERE counter_id = 'tweet_42';With N = 100 and a covering index on counter_id, a single index seek + 100 row fetches completes in low ms. For N = 10k+, parallelize:
- Pre-aggregated batch — a background job sums shards every 1-5 s and writes to a single
counter_totalrow. Reads hit that one row. - Cached at edge — front the counter with Distributed Cache; reads hit cache, miss falls back to aggregated read.
Adaptive sharding#
Hot counters need many shards; cold counters need one. Splitting wastes storage for the cold ones. Adaptive schemes:
- Auto-split — start at N=1. When a shard’s write rate exceeds threshold, split that counter into N shards.
- Heat-based — every counter has metadata; high-write counters land on more shards.
Google Cloud Datastore famously documented “do not write to one entity more than once per second” — sharded counters were the recommended workaround for hot writes.
Negative counts and decrements#
Decrement is symmetric: pick a shard, subtract. But individual shards can go negative even when the total is positive (one shard had only +5 increments and got 10 decrements). For display, sum across shards; for count > 0 predicates, sum first.
Approximate counting#
When even sharded counters cost too much, approximation:
- HyperLogLog — cardinality estimation (count of distinct items, e.g. unique visitors). Constant-size (12 KB) per counter, ~2% error, mergeable across shards.
- Count-Min Sketch — frequency estimation across many items in fixed memory.
- Probabilistic counters (Morris, 1978) — log-scale counters that approximate large counts with logarithmic storage.
Redis exposes HLL via PFADD / PFCOUNT. Twitter and Discord both use it for “approximate unique X” metrics.
Time-bucketed counters#
For “views in the last hour” rather than “lifetime views”:
counter_id = "tweet_42:hour_2024051610"Each hour gets its own counter (sharded internally). Reads sum recent buckets. Old buckets expire via TTL. The same pattern powers rate limiters (current_minute bucket).
Trade-offs#
Other axes:
- Strong consistency vs eventual consistency — if you need exact
count = K → rejectbehavior (inventory: don’t oversell), sharded counters are wrong. Use a transactional row with optimistic concurrency. - In-storage vs in-cache counters — Redis
INCRon a sharded set of keys is faster than DB writes but loses on durability. For “view counts where loss-of-10-views is fine”, Redis wins. - Per-key shard count — static N is simple; adaptive N gives better storage efficiency at operational cost.
- Eager aggregation vs lazy — pre-aggregating every few seconds makes reads cheap but adds a background job. Lazy aggregation keeps the system simpler.
Real-world examples#
- Google Cloud Datastore / Firestore — the canonical sharded-counter doc lives in Google’s official docs, recommending N=10-20 for moderate hotness.
- YouTube view counts — sharded counters per video; aggregated into a single visible count refreshed every few minutes. The famous “stuck at 301 views” was a side effect of the verification + sharding pipeline.
- Twitter / X like counts — sharded counters per tweet; for celebrity tweets, the visible count is cached aggressively and updated asynchronously.
- Reddit upvote / downvote counts — sharded counters per post per shard; aggregated periodically into the score.
- Instagram likes — sharded counters per post; recent denormalized into the post’s metadata.
- Riak’s counter CRDT — multi-master sharded counter that converges eventually under partitions.
- Cloudflare analytics — count-min sketches and HLL replace exact counters at edge scale.
Related building blocks#
- Distributed Cache — caches the aggregated total to avoid fanning out on every read.
- Key-Value Store — stores the shard rows; Redis
INCRis the canonical primitive. - Rate Limiter — also uses time-bucketed sharded counters internally.
- Distributed Monitoring — metrics counters at scale use the same decomposition.