Designing a Distributed Key-Value Store
Let's design a distributed key-value store like Amazon DynamoDB or Apache Cassandra from scratch. We'll focus on practical design decisions and implementation details that make these systems work at scale.
System Requirements
Functional Requirements
- •Small key-value pairs: Less than 10 KB each
- •Big data storage: Support billions of key-value pairs
- •High availability: System responds even during failures
- •High scalability: Horizontally scalable to support large datasets
- •Automatic scaling: Add/remove servers based on traffic
- •Tunable consistency: Choose consistency level per operation
- •Low latency: Sub-millisecond response times
API Design
A key-value store provides a simple API:
# Get value for a key
get(key) → value
# Store a key-value pair
put(key, value) → acknowledgment
# Delete a key-value pair
delete(key) → acknowledgment📝 Design Decision
We keep the API simple. Complex queries requiring joins or ranges are better served by traditional databases. This simplicity enables extreme optimization for speed and scale.
Core Component 1: Data Partitioning
🔄 Consistent Hashing for Data Distribution
We use consistent hashing to partition data across nodes. This minimizes data movement when nodes are added or removed.
How Consistent Hashing Works:
Hash Ring (0 to 2^32-1)
0°/360°
│
N1 ──┼── N4
\ │ /
\ │ /
270° ──┼── 90°
/ │ \
/ │ \
N3 ──┼── N2
│
180°
Key "user:123" → hash(key) = 45° → Stored on N4
Key "order:456" → hash(key) = 120° → Stored on N2✅ Benefits
- • Adding node affects only 1/N of data
- • Removing node redistributes only its data
- • No central directory needed
- • Each node manages ~360°/N range
🔧 Virtual Nodes
- • Each physical node = 100-200 virtual nodes
- • Better load distribution
- • Handles heterogeneous hardware
- • Example: Node1 → vnode1, vnode101, vnode201...
Core Component 2: Data Replication
📋 N-Way Replication Strategy
Each key is replicated on N nodes (typically N=3) for fault tolerance. The first N healthy nodes clockwise from the key's position store replicas.
Replication Example (N=3):
Key "user:123" hashes to position between N3 and N4
0°/360°
│
N1 ──┼── N4 ← Replica 1 (Primary)
\ │ /
\ │ /
270° ──┼── 90°
/ │ \ Key "user:123"
/ │ \ hashes here ●
N3 ──┼── N2 ← Replica 3
│ ↑ Replica 2
180°
Replicas: N4 (primary), N1, N2⚠️ Important Considerations
- • With virtual nodes, ensure replicas on different physical servers
- • Consider rack-aware and datacenter-aware replication
- • Replicas can be synchronous or asynchronous based on consistency needs
Core Component 3: Consistency Models
⚖️ Tunable Consistency with Quorums
We use quorum-based consistency where clients can choose consistency levels per operation.
Quorum Formula:
N = Number of replicas
W = Write quorum (nodes that must acknowledge write)
R = Read quorum (nodes that must respond to read)
W + R > N = Strong consistencyCommon Configurations:
| Configuration | N, W, R | Consistency | Use Case |
|---|---|---|---|
| Strong Consistency | N=3, W=3, R=1 | All writes go to all replicas | Financial transactions |
| Read Heavy | N=3, W=3, R=1 | Fast reads, slow writes | Product catalog |
| Write Heavy | N=3, W=1, R=3 | Fast writes, slow reads | Logging, metrics |
| Balanced | N=3, W=2, R=2 | W+R>N ensures consistency | User sessions |
| Eventual Consistency | N=3, W=1, R=1 | Fastest, may read stale data | Social media posts |
Coordinator Decision Flow
🧠 How Coordinator Enforces Consistency Levels
The coordinator node receives client requests with consistency requirements and orchestrates the appropriate read/write strategy.
📌 Important: The "coordinator" is not a special node type - it's a role that any regular cluster node can take on for a specific request. See the architecture section below for details.
Coordinator Decision Process:
Client Request: PUT key="user:456" value="data" consistency_level=QUORUM
Coordinator Logic:
1. Parse consistency level → W=2, R=2 (for N=3)
2. Hash key → determine replicas [Node A, Node B, Node C]
3. Execute write strategy based on consistency level
┌─────────────────────────────────────────────────────────────┐
│ WRITE CONSISTENCY FLOWS │
└─────────────────────────────────────────────────────────────┘
EVENTUAL (W=1): QUORUM (W=2): STRONG (W=3):
Client Client Client
│ │ │
▼ ▼ ▼
Coordinator Coordinator Coordinator
│ │ │
└──── Write to ANY 1 ──── ├──── Write to ANY 2 ── ├──── Write to ALL 3 ──
replica (fastest) │ replicas (majority) │ replicas (all)
│ │
Node A Node B Node C Node A Node B Node C Node A Node B Node C
✓ - - ✓ ✓ - ✓ ✓ ✓
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
ACK (fast) ACK ACK ACK ACK ACK
│ (wait for 2) (wait for all 3)
▼ │ │
SUCCESS ▼ ▼
(may be inconsistent) SUCCESS SUCCESS
(consistent) (strongly consistent)
┌─────────────────────────────────────────────────────────────┐
│ READ CONSISTENCY FLOWS │
└─────────────────────────────────────────────────────────────┘
EVENTUAL (R=1): QUORUM (R=2): STRONG (R=3):
Client Client Client
│ │ │
▼ ▼ ▼
Coordinator Coordinator Coordinator
│ │ │
└──── Read from ANY 1 ──── ├──── Read from ANY 2 ── ├──── Read from ALL 3 ──
replica (fastest) │ replicas (majority) │ replicas (all)
│ │
Node A Node B Node C Node A Node B Node C Node A Node B Node C
✓ - - ✓ ✓ - ✓ ✓ ✓
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
value1 value1 value2 value1 value2 value3
clock:{A:5} clock: clock: clock: clock: clock:
{A:5} {A:5} {A:5} {A:5} {A:5}
│ │ │ │ │ │
▼ └──────┼──────────────────────┴───────────┘
return value1 ▼ │
(may be stale) compare clocks ▼
return latest compare all clocks
(read repair async) return latest value
(read repair async)Coordinator Algorithm for Different Consistency Levels:
def handle_write_request(key, value, consistency_level):
replicas = get_replicas_for_key(key) # [NodeA, NodeB, NodeC]
if consistency_level == "ONE":
W = 1
# Write to fastest responding replica
future_writes = async_write_to_all(replicas, key, value)
wait_for_acks(future_writes, count=1)
return SUCCESS
elif consistency_level == "QUORUM":
W = (N // 2) + 1 # For N=3, W=2
# Write to majority of replicas
future_writes = async_write_to_all(replicas, key, value)
wait_for_acks(future_writes, count=W)
return SUCCESS
elif consistency_level == "ALL":
W = N # All replicas must acknowledge
future_writes = async_write_to_all(replicas, key, value)
wait_for_acks(future_writes, count=N)
return SUCCESS
def handle_read_request(key, consistency_level):
replicas = get_replicas_for_key(key)
if consistency_level == "ONE":
# Read from any one replica (fastest)
return async_read_from_any(replicas, key)
elif consistency_level == "QUORUM":
R = (N // 2) + 1 # For N=3, R=2
responses = async_read_from_all(replicas, key)
quorum_responses = wait_for_responses(responses, count=R)
latest_value = resolve_conflicts(quorum_responses)
async_read_repair(replicas, latest_value) # Fix stale replicas
return latest_value
elif consistency_level == "ALL":
# Read from all replicas
responses = async_read_from_all(replicas, key)
all_responses = wait_for_responses(responses, count=N)
return resolve_conflicts(all_responses)🚀 Eventual Consistency
- • Write: Wait for 1 ACK (fastest)
- • Read: Return from 1 replica
- • Latency: Lowest
- • Use case: Social feeds, counters
⚖️ Quorum Consistency
- • Write: Wait for majority ACKs
- • Read: Read from majority, compare
- • Latency: Medium
- • Use case: User profiles, preferences
🔒 Strong Consistency
- • Write: Wait for ALL ACKs
- • Read: Read from ALL replicas
- • Latency: Highest
- • Use case: Financial data, inventory
Core Component 4: Inconsistency Resolution
🔄 Vector Clocks for Conflict Resolution
When concurrent writes occur, we need to detect and resolve conflicts. Vector clocks track causality between events.
💡 CAP Theorem Connection: Conflict resolution is needed for high availability (AP systems) where we accept writes during partitions. In strongly consistent (CP systems) systems, we reject writes when we can't reach quorum, preventing conflicts but reducing availability.
Vector Clock Example:
Initial: key="cart:user1" value=[] vector_clock={}
Client A writes:
key="cart:user1" value=["item1"] vector_clock={A:1}
Client B reads then writes (concurrent with A):
key="cart:user1" value=["item2"] vector_clock={B:1}
Conflict Detection:
{A:1} and {B:1} are concurrent (neither happened-before the other)
Resolution:
1. Last-Write-Wins: Use timestamp, lose data
2. Application-level: Merge carts ["item1", "item2"]
3. CRDTs: Automatically mergeable data structures🕐 Last-Write-Wins
- • Simple implementation
- • Based on synchronized timestamps
- • Potential data loss
- • Good for cache-like data
🔀 Application Resolution
- • Client resolves conflicts
- • Domain-specific logic
- • No data loss
- • Example: Shopping cart merge
Core Component 5: Handling Failures
🛡️ Failure Detection & Recovery
1. Failure Detection: Gossip Protocol
Every T seconds, each node:
- Increments its heartbeat counter
- Randomly selects another node
- Shares its membership list with heartbeats
- Updates local membership list
Node marked as failed if heartbeat not updated for T_fail seconds
Example membership list at Node A:
┌─────────┬───────────┬──────────────┐
│ Node │ Heartbeat │ Last Updated │
├─────────┼───────────┼──────────────┤
│ Node A │ 142 │ Now │
│ Node B │ 140 │ 2 sec ago │
│ Node C │ 135 │ 7 sec ago │ ← Might be failing
│ Node D │ 141 │ 1 sec ago │
└─────────┴───────────┴──────────────┘🔍Deep Dive: How Gossip Protocol Really Works▼
📊 Step-by-Step Gossip Example
⏱️ Heartbeat Timing
🔀 Random Gossip Selection
🧮 Merge Rule Details
for i in range(num_nodes):
local[i] = max(local[i], received[i])
💀 Failure Detection Logic
🚀 Why Gossip Protocol is Brilliant
2. Temporary Failures: Sloppy Quorum & Hinted Handoff
When a node is temporarily unavailable, we use hinted handoff to maintain availability:
Normal: key "K" → replicas on [N1, N2, N3]
N3 is down temporarily
Write arrives:
1. Write to N1, N2 (meet quorum W=2)
2. N4 temporarily holds N3's write with hint
3. When N3 recovers, N4 transfers hinted writes
┌────┐ ┌────┐ ┌────┐ ┌────┐
│ N1 │ │ N2 │ │ N3 │ │ N4 │
│ ✓ │ │ ✓ │ │ ✗ │ │hint│
└────┘ └────┘ └────┘ └────┘
↑ ↓
Down temporarily
Hint: "Give to N3 when back"3. Permanent Failures: Anti-Entropy with Merkle Trees
Detect and repair inconsistencies between replicas using Merkle trees:
Each node maintains Merkle tree of its data:
Root Hash
/ \
Hash_L Hash_R
/ \ / \
Hash_LL Hash_LR Hash_RL Hash_RR
| | | |
Data Data Data Data
Range Range Range Range
Sync Process:
1. Compare root hashes
2. If different, compare children
3. Recursively find differing ranges
4. Transfer only different dataSystem Architecture
🏗️ Complete System Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Client Layer │
│ (Smart routing with consistent hashing) │
└─────────┬─────────────────┬─────────────────┬─────────────────┬─┘
│ │ │ │
hash("k1")→A hash("k2")→B hash("k3")→C hash("k4")→D
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Peer-to-Peer Cluster │
│ (No dedicated coordinators) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node A │ │ Node B │ │ Node C │ │ Node D │ ... │
│ │ ────── │ │ ────── │ │ ────── │ │ ────── │ │
│ │ Can be │ │ Can be │ │ Can be │ │ Can be │ │
│ │Coordin- │ │Coordin- │ │Coordin- │ │Coordin- │ │
│ │ ator │ │ ator │ │ ator │ │ ator │ │
│ │ ────── │ │ ────── │ │ ────── │ │ ────── │ │
│ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │
│ │ │ LSM │ │ │ │ LSM │ │ │ │ LSM │ │ │ │ LSM │ │ │
│ │ │Tree │ │ │ │Tree │ │ │ │Tree │ │ │ │Tree │ │ │
│ │ └─────┘ │ │ └─────┘ │ │ └─────┘ │ │ └─────┘ │ │
│ │ Virtual │ │ Virtual │ │ Virtual │ │ Virtual │ │
│ │ Nodes: │ │ Nodes: │ │ Nodes: │ │ Nodes: │ │
│ │ 1,101, │ │ 51,151, │ │ 2,102, │ │ 52,152, │ │
│ │ 201... │ │ 251... │ │ 202... │ │ 252... │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Gossip Protocol ←────────────────────────────→ Gossip │
│ ↕ ↕ │
│ Anti-entropy (Merkle Trees) for replica synchronization │
│ │
│ Example: Client requests key="user:123" │
│ 1. Client: hash("user:123") → Node B │
│ 2. Client sends request directly to Node B │
│ 3. Node B becomes coordinator for this request │
│ 4. Node B handles replication to Node C, Node D │
└─────────────────────────────────────────────────────────────────┘📝 Client Layer
- • Client library with consistent hashing logic
- • Connection pooling to all nodes
- • Retry logic with exponential backoff
- • Optional: Request batching
🎯 Coordinator Role
- • Handles client requests
- • Manages replication
- • Enforces consistency levels
- • Coordinates failure handling
Coordinator Node: Role vs Architecture
🤔 Is the Coordinator a Special Node?
Answer: No, the "coordinator" is a role, not a special node type. Any node in the cluster can act as coordinator for a request.
How Coordinator Selection Works:
Option 1: Client-side Routing (Preferred)
┌─────────────────────────────────────────────────────────────┐
│ Client has consistent hashing logic │
│ 1. Client hashes key: hash("user:123") → Node B │
│ 2. Client sends request directly to Node B │
│ 3. Node B acts as coordinator for this request │
│ 4. Node B manages replication to other nodes │
└─────────────────────────────────────────────────────────────┘
Option 2: Server-side Routing (Alternative)
┌─────────────────────────────────────────────────────────────┐
│ Load balancer distributes requests │
│ 1. Client sends request to any node (e.g., Node A) │
│ 2. Node A hashes key: hash("user:123") → Node B │
│ 3. Node A forwards request to Node B │
│ 4. Node B acts as coordinator │
└─────────────────────────────────────────────────────────────┘
Option 3: Proxy Layer (Production systems)
┌─────────────────────────────────────────────────────────────┐
│ Dedicated proxy/gateway layer │
│ 1. Client → Proxy layer │
│ 2. Proxy hashes key and routes to correct node │
│ 3. Target node acts as coordinator │
│ 4. Used in systems like Cassandra (with drivers) │
└─────────────────────────────────────────────────────────────┘Key Architectural Points:
✅ Every Node Can Coordinate
- • All nodes have identical code
- • All nodes know the cluster topology
- • Any node can handle any request
- • No single point of failure
- • Better load distribution
🎯 Smart Routing Optimization
- • Client routes to primary replica owner
- • Reduces network hops (no forwarding)
- • Lower latency (direct coordination)
- • Better resource utilization
- • Scales with cluster size
Real-World Examples: How Systems Handle Client Routing
| System | Client Routing Strategy | How It Works | Trade-offs |
|---|---|---|---|
| Amazon DynamoDB | Smart Client | • AWS SDK knows partition map | ✅ Lowest latency |
| Apache Cassandra | Hybrid | • Driver can route directly (preferred) | ✅ Flexible |
| ScyllaDB | Shard-Aware | • Driver routes to specific CPU shard | ✅ Ultra-low latency |
| Apache Riak | Any Node | • Load balancer distributes randomly | ✅ Simple clients |
| Redis Cluster | MOVED Redirects | • Client connects to any node | ✅ Self-learning clients |
| MongoDB (Sharded) | Proxy Layer | • mongos proxy layer | ✅ Simple clients |
💡 Design Decision
We choose client-side routing because it's most efficient: clients route directly to the node that owns the data, making that node the natural coordinator. This eliminates forwarding overhead and scales perfectly as the cluster grows.
Write Path
✍️ Detailed Write Operation Flow
Write Operation: put("user:123", {name: "Alice", age: 30})
1. Client hashes key: hash("user:123") = 0x3F2A... → Node B
2. Request goes to coordinator (often Node B itself):
PUT /kv/user:123
{name: "Alice", age: 30}
Consistency: W=2
3. Coordinator identifies replicas (N=3):
Primary: Node B (owns the hash range)
Replica 1: Node C (next in ring)
Replica 2: Node D (next after C)
4. Parallel writes to W=2 nodes:
Coordinator
│
┌────┴────┬────────┐
▼ ▼ ▼
Node B Node C Node D
(write) (write) (async)
│ │
▼ ▼
Ack 1 Ack 2
│ │
└────┬────┘
│
▼
Client gets success
(after W=2 acks)
5. Each storage node:
a. Writes to commit log (durability)
b. Updates in-memory memtable
c. Sends acknowledgment
d. Eventually flushes to SSTable on disk⚡ Performance Optimizations
- • Write-through cache: Update cache after successful write
- • Bloom filters: Quick negative lookups to avoid disk reads
- • Write batching: Group multiple writes to same node
- • Compression: Compress values before storage
Read Path
📖 Detailed Read Operation Flow
🚀 Performance Optimization: Our KV store uses Bloom filters to avoid unnecessary disk reads. Before checking each SSTable, we first consult its Bloom filter - if it says "definitely not present", we skip that SSTable entirely, saving expensive disk I/O.
Read Operation: get("user:123") with R=2
1. Client hashes key: hash("user:123") = 0x3F2A... → Node B
2. Request goes to coordinator:
GET /kv/user:123
Consistency: R=2
3. Coordinator queries R=2 replicas in parallel:
Coordinator
│
┌────┴────┬────────┐
▼ ▼ ▼
Node B Node C Node D
(read) (read) (skip)
│ │
▼ ▼
value v1 value v2
clock: clock:
{B:3} {B:3}
│ │
└────┬────┘
│
▼
Latest value
returned to client
4. Each storage node read process:
a. Check in-memory cache
b. If miss, check memtable
c. If miss, check bloom filters
d. If positive, read from SSTables
e. Return value with vector clock
5. Coordinator:
a. Waits for R responses
b. Compares vector clocks
c. Returns latest consistent value
d. Optional: Read repair for stale replicas🔍 Read Repair
- • Detect stale replicas during read
- • Update them with latest value
- • Happens asynchronously
- • Improves eventual consistency
⚡ Cache Strategy
- • LRU cache at each node
- • Cache invalidation on writes
- • Bloom filters for negative cache
- • Hot data stays in memory
Performance & Scale
📊 Performance Characteristics
| Metric | Target | How Achieved |
|---|---|---|
| Read Latency (p99) | < 10ms | Memory cache, bloom filters, parallel reads |
| Write Latency (p99) | < 10ms | Async replication, write batching, LSM trees |
| Throughput per node | 10K ops/sec | Efficient data structures, connection pooling |
| Storage per node | 1-10 TB | Compression, compaction, tiered storage |
| Cluster size | 100s of nodes | Gossip protocol, consistent hashing |
Summary & Key Takeaways
🎯 Design Decisions Recap
- 1.Consistent Hashing: Minimal data movement during scaling
- 2.Configurable Replication: N-way replication for fault tolerance
- 3.Tunable Consistency: Choose CAP trade-offs per operation
- 4.Vector Clocks: Detect and resolve concurrent updates
- 5.Gossip + Anti-entropy: Handle failures and maintain consistency
- 6.LSM Trees: Optimize for write-heavy workloads
💡 When to Use This Design
This distributed key-value store design is ideal for applications requiring high availability, horizontal scalability, and flexible consistency. Examples include session stores, user preferences, shopping carts, and feature flags. For complex queries or strong ACID guarantees, consider traditional databases instead.
References & Deep Dives
📚 Learn More
- • CAP Theorem & Theoretical Foundations - Interactive visualizations
- • Consistent Hashing Deep Dive - Detailed implementation
- • Database Scaling Fundamentals - Replication vs Sharding
- • Storage Engines: B-Tree vs LSM Tree - Why we chose LSM
- • Amazon DynamoDB Paper - Original inspiration
- • Apache Cassandra Documentation - Production implementation