Designing a Distributed Key-Value Store

Let's design a distributed key-value store like Amazon DynamoDB or Apache Cassandra from scratch. We'll focus on practical design decisions and implementation details that make these systems work at scale.

System Requirements

Functional Requirements

  • Small key-value pairs: Less than 10 KB each
  • Big data storage: Support billions of key-value pairs
  • High availability: System responds even during failures
  • High scalability: Horizontally scalable to support large datasets
  • Automatic scaling: Add/remove servers based on traffic
  • Tunable consistency: Choose consistency level per operation
  • Low latency: Sub-millisecond response times

API Design

A key-value store provides a simple API:

# Get value for a key
get(key) → value

# Store a key-value pair
put(key, value) → acknowledgment

# Delete a key-value pair
delete(key) → acknowledgment

📝 Design Decision

We keep the API simple. Complex queries requiring joins or ranges are better served by traditional databases. This simplicity enables extreme optimization for speed and scale.

Core Component 1: Data Partitioning

🔄 Consistent Hashing for Data Distribution

We use consistent hashing to partition data across nodes. This minimizes data movement when nodes are added or removed.

How Consistent Hashing Works:

Hash Ring (0 to 2^32-1)

     0°/360°
   N1 ──┼── N4
    \   │   /
     \  │  /
270° ──┼── 90°
     /  │  \
    /   │   \
   N3 ──┼── N2
      180°

Key "user:123" → hash(key) = 45° → Stored on N4
Key "order:456" → hash(key) = 120° → Stored on N2

✅ Benefits

  • • Adding node affects only 1/N of data
  • • Removing node redistributes only its data
  • • No central directory needed
  • • Each node manages ~360°/N range

🔧 Virtual Nodes

  • • Each physical node = 100-200 virtual nodes
  • • Better load distribution
  • • Handles heterogeneous hardware
  • • Example: Node1 → vnode1, vnode101, vnode201...

Core Component 2: Data Replication

📋 N-Way Replication Strategy

Each key is replicated on N nodes (typically N=3) for fault tolerance. The first N healthy nodes clockwise from the key's position store replicas.

Replication Example (N=3):

Key "user:123" hashes to position between N3 and N4

     0°/360°
   N1 ──┼── N4 ← Replica 1 (Primary)
    \   │   /
     \  │  /
270° ──┼── 90°
     /  │  \    Key "user:123"
    /   │   \   hashes here ●
   N3 ──┼── N2 ← Replica 3
        │      ↑ Replica 2
      180°

Replicas: N4 (primary), N1, N2

⚠️ Important Considerations

  • • With virtual nodes, ensure replicas on different physical servers
  • • Consider rack-aware and datacenter-aware replication
  • • Replicas can be synchronous or asynchronous based on consistency needs

Core Component 3: Consistency Models

⚖️ Tunable Consistency with Quorums

We use quorum-based consistency where clients can choose consistency levels per operation.

Quorum Formula:

N = Number of replicas
W = Write quorum (nodes that must acknowledge write)
R = Read quorum (nodes that must respond to read)

W + R > N = Strong consistency

Common Configurations:

ConfigurationN, W, RConsistencyUse Case
Strong ConsistencyN=3, W=3, R=1All writes go to all replicasFinancial transactions
Read HeavyN=3, W=3, R=1Fast reads, slow writesProduct catalog
Write HeavyN=3, W=1, R=3Fast writes, slow readsLogging, metrics
BalancedN=3, W=2, R=2W+R>N ensures consistencyUser sessions
Eventual ConsistencyN=3, W=1, R=1Fastest, may read stale dataSocial media posts

Coordinator Decision Flow

🧠 How Coordinator Enforces Consistency Levels

The coordinator node receives client requests with consistency requirements and orchestrates the appropriate read/write strategy.

📌 Important: The "coordinator" is not a special node type - it's a role that any regular cluster node can take on for a specific request. See the architecture section below for details.

Coordinator Decision Process:

Client Request: PUT key="user:456" value="data" consistency_level=QUORUM

Coordinator Logic:
1. Parse consistency level → W=2, R=2 (for N=3)
2. Hash key → determine replicas [Node A, Node B, Node C]
3. Execute write strategy based on consistency level

┌─────────────────────────────────────────────────────────────┐
│                    WRITE CONSISTENCY FLOWS                 │
└─────────────────────────────────────────────────────────────┘

EVENTUAL (W=1):                QUORUM (W=2):               STRONG (W=3):
Client                         Client                      Client
   │                             │                           │
   ▼                             ▼                           ▼
Coordinator                    Coordinator                 Coordinator
   │                             │                           │
   └──── Write to ANY 1 ────     ├──── Write to ANY 2 ──    ├──── Write to ALL 3 ──
        replica (fastest)        │     replicas (majority)   │     replicas (all)
                                 │                           │
Node A  Node B  Node C          Node A  Node B  Node C      Node A  Node B  Node C
  ✓      -       -                ✓      ✓       -            ✓      ✓       ✓
  │                                │      │                    │      │       │
  ▼                                ▼      ▼                    ▼      ▼       ▼
ACK (fast)                       ACK    ACK                  ACK    ACK     ACK
  │                              (wait for 2)              (wait for all 3)
  ▼                                │                           │
SUCCESS                            ▼                           ▼
(may be inconsistent)            SUCCESS                    SUCCESS
                                (consistent)               (strongly consistent)


┌─────────────────────────────────────────────────────────────┐
│                    READ CONSISTENCY FLOWS                  │
└─────────────────────────────────────────────────────────────┘

EVENTUAL (R=1):                QUORUM (R=2):               STRONG (R=3):
Client                         Client                      Client
   │                             │                           │
   ▼                             ▼                           ▼
Coordinator                    Coordinator                 Coordinator
   │                             │                           │
   └──── Read from ANY 1 ────    ├──── Read from ANY 2 ──    ├──── Read from ALL 3 ──
        replica (fastest)        │     replicas (majority)   │     replicas (all)
                                 │                           │
Node A  Node B  Node C          Node A  Node B  Node C      Node A  Node B  Node C
  ✓      -       -                ✓      ✓       -            ✓      ✓       ✓
  │                                │      │                    │      │       │
  ▼                                ▼      ▼                    ▼      ▼       ▼
value1                         value1  value2              value1  value2  value3
clock:{A:5}                    clock:  clock:              clock:  clock:  clock:
                               {A:5}   {A:5}               {A:5}   {A:5}   {A:5}
  │                                │      │                    │      │       │
  ▼                                └──────┼──────────────────────┴───────────┘
return value1                             ▼                           │
(may be stale)                    compare clocks                      ▼
                                  return latest                  compare all clocks
                                  (read repair async)            return latest value
                                                                (read repair async)

Coordinator Algorithm for Different Consistency Levels:

def handle_write_request(key, value, consistency_level):
    replicas = get_replicas_for_key(key)  # [NodeA, NodeB, NodeC]
    
    if consistency_level == "ONE":
        W = 1
        # Write to fastest responding replica
        future_writes = async_write_to_all(replicas, key, value)
        wait_for_acks(future_writes, count=1)
        return SUCCESS
        
    elif consistency_level == "QUORUM": 
        W = (N // 2) + 1  # For N=3, W=2
        # Write to majority of replicas
        future_writes = async_write_to_all(replicas, key, value) 
        wait_for_acks(future_writes, count=W)
        return SUCCESS
        
    elif consistency_level == "ALL":
        W = N  # All replicas must acknowledge
        future_writes = async_write_to_all(replicas, key, value)
        wait_for_acks(future_writes, count=N)
        return SUCCESS
        
def handle_read_request(key, consistency_level):
    replicas = get_replicas_for_key(key)
    
    if consistency_level == "ONE":
        # Read from any one replica (fastest)
        return async_read_from_any(replicas, key)
        
    elif consistency_level == "QUORUM":
        R = (N // 2) + 1  # For N=3, R=2
        responses = async_read_from_all(replicas, key)
        quorum_responses = wait_for_responses(responses, count=R)
        latest_value = resolve_conflicts(quorum_responses)
        async_read_repair(replicas, latest_value)  # Fix stale replicas
        return latest_value
        
    elif consistency_level == "ALL":
        # Read from all replicas
        responses = async_read_from_all(replicas, key) 
        all_responses = wait_for_responses(responses, count=N)
        return resolve_conflicts(all_responses)

🚀 Eventual Consistency

  • Write: Wait for 1 ACK (fastest)
  • Read: Return from 1 replica
  • Latency: Lowest
  • Use case: Social feeds, counters

⚖️ Quorum Consistency

  • Write: Wait for majority ACKs
  • Read: Read from majority, compare
  • Latency: Medium
  • Use case: User profiles, preferences

🔒 Strong Consistency

  • Write: Wait for ALL ACKs
  • Read: Read from ALL replicas
  • Latency: Highest
  • Use case: Financial data, inventory

Core Component 4: Inconsistency Resolution

🔄 Vector Clocks for Conflict Resolution

When concurrent writes occur, we need to detect and resolve conflicts. Vector clocks track causality between events.

💡 CAP Theorem Connection: Conflict resolution is needed for high availability (AP systems) where we accept writes during partitions. In strongly consistent (CP systems) systems, we reject writes when we can't reach quorum, preventing conflicts but reducing availability.

Vector Clock Example:

Initial: key="cart:user1" value=[] vector_clock={}

Client A writes:
key="cart:user1" value=["item1"] vector_clock={A:1}

Client B reads then writes (concurrent with A):
key="cart:user1" value=["item2"] vector_clock={B:1}

Conflict Detection:
{A:1} and {B:1} are concurrent (neither happened-before the other)

Resolution:
1. Last-Write-Wins: Use timestamp, lose data
2. Application-level: Merge carts ["item1", "item2"]
3. CRDTs: Automatically mergeable data structures

🕐 Last-Write-Wins

  • • Simple implementation
  • • Based on synchronized timestamps
  • • Potential data loss
  • • Good for cache-like data

🔀 Application Resolution

  • • Client resolves conflicts
  • • Domain-specific logic
  • • No data loss
  • • Example: Shopping cart merge

Core Component 5: Handling Failures

🛡️ Failure Detection & Recovery

1. Failure Detection: Gossip Protocol

Every T seconds, each node:

  1. Increments its heartbeat counter
  2. Randomly selects another node
  3. Shares its membership list with heartbeats
  4. Updates local membership list

Node marked as failed if heartbeat not updated for T_fail seconds

Example membership list at Node A:
┌─────────┬───────────┬──────────────┐
│  Node   │ Heartbeat │ Last Updated │
├─────────┼───────────┼──────────────┤
│ Node A  │    142    │     Now      │
│ Node B  │    140    │   2 sec ago  │
│ Node C  │    135    │   7 sec ago  │ ← Might be failing
│ Node D  │    141    │   1 sec ago  │
└─────────┴───────────┴──────────────┘
🔍Deep Dive: How Gossip Protocol Really Works

📊 Step-by-Step Gossip Example

Initial State (4 nodes):
A: [0,0,0,0] B: [0,0,0,0] C: [0,0,0,0] D: [0,0,0,0] (down)
After heartbeat increments:
A: [1,0,0,0] B: [0,1,0,0] C: [0,0,1,0] D: still down
A gossips to B:
B receives [1,0,0,0], merges with [0,1,0,0]
B: [max(1,0), max(0,1), max(0,0), max(0,0)] = [1,1,0,0] ✓
B gossips to C:
C receives [1,1,0,0], merges with [0,0,1,0]
C: [max(1,0), max(1,0), max(0,1), max(0,0)] = [1,1,1,0] ✓
After several rounds:
All nodes: [1,1,1,0] → D's counter hasn't increased → suspected failure!

⏱️ Heartbeat Timing

Time 0: A: [0,0,0,0]
Time 1: A: [1,0,0,0]
Time 2: A: [2,0,0,0]
Time 3: A: [3,0,0,0]
Each node increments its own slot every second

🔀 Random Gossip Selection

Each gossip round (every 1-2 seconds):
• Pick 1-2 random nodes to share with
• Information spreads exponentially:
1 → 2 → 4 → 8 → all nodes
Ensures fast, fault-tolerant spread

🧮 Merge Rule Details

When Node X receives gossip from Node Y:

for i in range(num_nodes):
  local[i] = max(local[i], received[i])

Always take the higher heartbeat value

💀 Failure Detection Logic

After 10 seconds:
Healthy: [10,8,12,7] (all incrementing)
Failed: [10,8,12,0] (D stuck at 0)
If counter doesn't increase → node is down

🚀 Why Gossip Protocol is Brilliant

Scalability
O(log N) rounds to reach all N nodes. Works with thousands of nodes.
Fault Tolerance
Even if some gossip messages fail, information still spreads through multiple paths.
No Central Point
No coordinator needed. Every node acts independently but achieves consensus.

2. Temporary Failures: Sloppy Quorum & Hinted Handoff

When a node is temporarily unavailable, we use hinted handoff to maintain availability:

Normal: key "K" → replicas on [N1, N2, N3]
N3 is down temporarily

Write arrives:
1. Write to N1, N2 (meet quorum W=2)
2. N4 temporarily holds N3's write with hint
3. When N3 recovers, N4 transfers hinted writes

┌────┐  ┌────┐  ┌────┐  ┌────┐
│ N1 │  │ N2 │  │ N3 │  │ N4 │
│ ✓  │  │ ✓  │  │ ✗  │  │hint│
└────┘  └────┘  └────┘  └────┘
                   ↑       ↓
               Down temporarily
               Hint: "Give to N3 when back"

3. Permanent Failures: Anti-Entropy with Merkle Trees

Detect and repair inconsistencies between replicas using Merkle trees:

Each node maintains Merkle tree of its data:

              Root Hash
             /          \
        Hash_L           Hash_R
       /      \         /      \
    Hash_LL Hash_LR  Hash_RL Hash_RR
      |       |        |       |
    Data    Data     Data    Data
    Range   Range    Range   Range

Sync Process:
1. Compare root hashes
2. If different, compare children
3. Recursively find differing ranges
4. Transfer only different data

System Architecture

🏗️ Complete System Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Client Layer                                │
│              (Smart routing with consistent hashing)           │
└─────────┬─────────────────┬─────────────────┬─────────────────┬─┘
          │                 │                 │                 │
      hash("k1")→A      hash("k2")→B      hash("k3")→C      hash("k4")→D
          │                 │                 │                 │
          ▼                 ▼                 ▼                 ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Peer-to-Peer Cluster                      │
│                   (No dedicated coordinators)                  │
│                                                                 │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐           │
│  │ Node A  │  │ Node B  │  │ Node C  │  │ Node D  │  ...      │
│  │ ──────  │  │ ──────  │  │ ──────  │  │ ──────  │           │
│  │ Can be  │  │ Can be  │  │ Can be  │  │ Can be  │           │
│  │Coordin- │  │Coordin- │  │Coordin- │  │Coordin- │           │
│  │ ator    │  │ ator    │  │ ator    │  │ ator    │           │
│  │ ──────  │  │ ──────  │  │ ──────  │  │ ──────  │           │
│  │ ┌─────┐ │  │ ┌─────┐ │  │ ┌─────┐ │  │ ┌─────┐ │           │
│  │ │ LSM │ │  │ │ LSM │ │  │ │ LSM │ │  │ │ LSM │ │           │
│  │ │Tree │ │  │ │Tree │ │  │ │Tree │ │  │ │Tree │ │           │
│  │ └─────┘ │  │ └─────┘ │  │ └─────┘ │  │ └─────┘ │           │
│  │ Virtual │  │ Virtual │  │ Virtual │  │ Virtual │           │
│  │ Nodes:  │  │ Nodes:  │  │ Nodes:  │  │ Nodes:  │           │
│  │ 1,101,  │  │ 51,151, │  │ 2,102,  │  │ 52,152, │           │
│  │ 201...  │  │ 251...  │  │ 202...  │  │ 252...  │           │
│  └─────────┘  └─────────┘  └─────────┘  └─────────┘           │
│                                                                 │
│  Gossip Protocol ←────────────────────────────→ Gossip        │
│       ↕                                            ↕           │
│  Anti-entropy (Merkle Trees) for replica synchronization       │
│                                                                 │
│  Example: Client requests key="user:123"                      │
│  1. Client: hash("user:123") → Node B                         │
│  2. Client sends request directly to Node B                   │
│  3. Node B becomes coordinator for this request               │
│  4. Node B handles replication to Node C, Node D             │
└─────────────────────────────────────────────────────────────────┘

📝 Client Layer

  • • Client library with consistent hashing logic
  • • Connection pooling to all nodes
  • • Retry logic with exponential backoff
  • • Optional: Request batching

🎯 Coordinator Role

  • • Handles client requests
  • • Manages replication
  • • Enforces consistency levels
  • • Coordinates failure handling

Coordinator Node: Role vs Architecture

🤔 Is the Coordinator a Special Node?

Answer: No, the "coordinator" is a role, not a special node type. Any node in the cluster can act as coordinator for a request.

How Coordinator Selection Works:

Option 1: Client-side Routing (Preferred)
┌─────────────────────────────────────────────────────────────┐
│ Client has consistent hashing logic                        │
│ 1. Client hashes key: hash("user:123") → Node B            │
│ 2. Client sends request directly to Node B                 │
│ 3. Node B acts as coordinator for this request            │
│ 4. Node B manages replication to other nodes              │
└─────────────────────────────────────────────────────────────┘

Option 2: Server-side Routing (Alternative)
┌─────────────────────────────────────────────────────────────┐
│ Load balancer distributes requests                         │
│ 1. Client sends request to any node (e.g., Node A)        │
│ 2. Node A hashes key: hash("user:123") → Node B           │
│ 3. Node A forwards request to Node B                      │
│ 4. Node B acts as coordinator                             │
└─────────────────────────────────────────────────────────────┘

Option 3: Proxy Layer (Production systems)
┌─────────────────────────────────────────────────────────────┐
│ Dedicated proxy/gateway layer                              │
│ 1. Client → Proxy layer                                   │
│ 2. Proxy hashes key and routes to correct node           │
│ 3. Target node acts as coordinator                        │
│ 4. Used in systems like Cassandra (with drivers)         │
└─────────────────────────────────────────────────────────────┘

Key Architectural Points:

✅ Every Node Can Coordinate

  • • All nodes have identical code
  • • All nodes know the cluster topology
  • • Any node can handle any request
  • • No single point of failure
  • • Better load distribution

🎯 Smart Routing Optimization

  • • Client routes to primary replica owner
  • • Reduces network hops (no forwarding)
  • • Lower latency (direct coordination)
  • • Better resource utilization
  • • Scales with cluster size

Real-World Examples: How Systems Handle Client Routing

SystemClient Routing StrategyHow It WorksTrade-offs
Amazon DynamoDBSmart Client

• AWS SDK knows partition map
• Routes directly to correct node
• No intermediate hops
• Topology updates via service discovery

✅ Lowest latency
❌ Complex SDK logic

Apache CassandraHybrid

• Driver can route directly (preferred)
• OR connect to any node
• Non-coordinator nodes forward requests
• Uses token ring awareness

✅ Flexible
✅ Graceful degradation
❌ Extra hop when forwarding

ScyllaDBShard-Aware

• Driver routes to specific CPU shard
• Sub-node level routing (not just node)
• Eliminates cross-shard coordination
• Maximum parallelism

✅ Ultra-low latency
✅ CPU-aware
❌ Driver complexity

Apache RiakAny Node

• Load balancer distributes randomly
• Any node can coordinate any request
• Internal forwarding if needed
• Simple client implementation

✅ Simple clients
✅ High availability
❌ Potential extra hops

Redis ClusterMOVED Redirects

• Client connects to any node
• Node returns MOVED if wrong coordinator
• Client learns topology and caches
• Eventually routes directly

✅ Self-learning clients
✅ Topology discovery
❌ Initial redirect overhead

MongoDB (Sharded)Proxy Layer

• mongos proxy layer
• Clients connect to mongos
• mongos routes to correct shards
• Aggregates results from multiple shards

✅ Simple clients
✅ Complex queries
❌ Proxy bottleneck

💡 Design Decision

We choose client-side routing because it's most efficient: clients route directly to the node that owns the data, making that node the natural coordinator. This eliminates forwarding overhead and scales perfectly as the cluster grows.

Write Path

✍️ Detailed Write Operation Flow

Write Operation: put("user:123", {name: "Alice", age: 30})

1. Client hashes key: hash("user:123") = 0x3F2A... → Node B

2. Request goes to coordinator (often Node B itself):
   PUT /kv/user:123
   {name: "Alice", age: 30}
   Consistency: W=2

3. Coordinator identifies replicas (N=3):
   Primary: Node B (owns the hash range)
   Replica 1: Node C (next in ring)
   Replica 2: Node D (next after C)

4. Parallel writes to W=2 nodes:
   
   Coordinator
   ┌────┴────┬────────┐
   ▼         ▼        ▼
Node B    Node C   Node D
(write)   (write)  (async)
   │         │        
   ▼         ▼        
 Ack 1     Ack 2   
   │         │
   └────┬────┘
   Client gets success
   (after W=2 acks)

5. Each storage node:
   a. Writes to commit log (durability)
   b. Updates in-memory memtable
   c. Sends acknowledgment
   d. Eventually flushes to SSTable on disk

⚡ Performance Optimizations

  • Write-through cache: Update cache after successful write
  • Bloom filters: Quick negative lookups to avoid disk reads
  • Write batching: Group multiple writes to same node
  • Compression: Compress values before storage

Read Path

📖 Detailed Read Operation Flow

🚀 Performance Optimization: Our KV store uses Bloom filters to avoid unnecessary disk reads. Before checking each SSTable, we first consult its Bloom filter - if it says "definitely not present", we skip that SSTable entirely, saving expensive disk I/O.

Read Operation: get("user:123") with R=2

1. Client hashes key: hash("user:123") = 0x3F2A... → Node B

2. Request goes to coordinator:
   GET /kv/user:123
   Consistency: R=2

3. Coordinator queries R=2 replicas in parallel:
   
   Coordinator
   ┌────┴────┬────────┐
   ▼         ▼        ▼
Node B    Node C   Node D
(read)    (read)   (skip)
   │         │        
   ▼         ▼        
value v1  value v2
clock:    clock:
{B:3}     {B:3}
   │         │
   └────┬────┘
   Latest value
   returned to client

4. Each storage node read process:
   a. Check in-memory cache
   b. If miss, check memtable
   c. If miss, check bloom filters
   d. If positive, read from SSTables
   e. Return value with vector clock

5. Coordinator:
   a. Waits for R responses
   b. Compares vector clocks
   c. Returns latest consistent value
   d. Optional: Read repair for stale replicas

🔍 Read Repair

  • • Detect stale replicas during read
  • • Update them with latest value
  • • Happens asynchronously
  • • Improves eventual consistency

⚡ Cache Strategy

  • • LRU cache at each node
  • • Cache invalidation on writes
  • • Bloom filters for negative cache
  • • Hot data stays in memory

Performance & Scale

📊 Performance Characteristics

MetricTargetHow Achieved
Read Latency (p99)< 10msMemory cache, bloom filters, parallel reads
Write Latency (p99)< 10msAsync replication, write batching, LSM trees
Throughput per node10K ops/secEfficient data structures, connection pooling
Storage per node1-10 TBCompression, compaction, tiered storage
Cluster size100s of nodesGossip protocol, consistent hashing

Summary & Key Takeaways

🎯 Design Decisions Recap

  • 1.Consistent Hashing: Minimal data movement during scaling
  • 2.Configurable Replication: N-way replication for fault tolerance
  • 3.Tunable Consistency: Choose CAP trade-offs per operation
  • 4.Vector Clocks: Detect and resolve concurrent updates
  • 5.Gossip + Anti-entropy: Handle failures and maintain consistency
  • 6.LSM Trees: Optimize for write-heavy workloads

💡 When to Use This Design

This distributed key-value store design is ideal for applications requiring high availability, horizontal scalability, and flexible consistency. Examples include session stores, user preferences, shopping carts, and feature flags. For complex queries or strong ACID guarantees, consider traditional databases instead.

References & Deep Dives

📚 Learn More