Skip to content

Intermediate

This section covers intermediate system design concepts through 29 annotated examples (Examples 29-57). Each example demonstrates a production-relevant pattern with working code in Go and Python, diagrams, and explanations targeted at engineers who already understand basic distributed systems primitives.

Message Queues

Example 29: Publish-Subscribe Pattern with Message Queue

Pub/sub decouples producers from consumers: a producer publishes a message to a topic, and all subscribers to that topic receive a copy independently. This differs from point-to-point queues where exactly one consumer receives each message. Pub/sub enables fan-out — one event notifying many downstream services simultaneously.

    graph LR
    P["Producer<br/>OrderService"]:::blue
    T["Topic<br/>order.created"]:::orange
    S1["Subscriber<br/>EmailService"]:::teal
    S2["Subscriber<br/>InventoryService"]:::teal
    S3["Subscriber<br/>AnalyticsService"]:::teal

    P -->|publish| T
    T -->|fan-out copy| S1
    T -->|fan-out copy| S2
    T -->|fan-out copy| S3

    classDef blue fill:#0173B2,stroke:#000,color:#fff
    classDef orange fill:#DE8F05,stroke:#000,color:#fff
    classDef teal fill:#029E73,stroke:#000,color:#fff
  
package main

import (
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

// PubSubBroker: central coordinator holding topic -> subscriber mappings
type PubSubBroker struct {
    subscribers map[string][]func(map[string]interface{})
    mu          sync.Mutex
}

func NewPubSubBroker() *PubSubBroker {
    return &PubSubBroker{
        subscribers: make(map[string][]func(map[string]interface{})),
        // => subscribers is {} initially
    }
}

func (b *PubSubBroker) Subscribe(topic string, handler func(map[string]interface{})) {
    // Acquire lock before mutating shared subscriber map
    // => prevents race conditions when multiple services subscribe concurrently
    b.mu.Lock()
    defer b.mu.Unlock()
    b.subscribers[topic] = append(b.subscribers[topic], handler)
    // => handler is appended to topic's subscriber list
    fmt.Printf("[BROKER] Subscribed handler to topic '%s'\n", topic)
}

func (b *PubSubBroker) Publish(topic string, message map[string]interface{}) {
    // Snapshot subscriber list under lock to avoid holding lock during handler execution
    // => handlers can themselves call Subscribe without deadlocking
    b.mu.Lock()
    handlers := make([]func(map[string]interface{}), len(b.subscribers[topic]))
    copy(handlers, b.subscribers[topic])
    // => handlers is a shallow copy of the list at publish time
    b.mu.Unlock()

    jsonBytes, _ := json.Marshal(message)
    fmt.Printf("[BROKER] Publishing to '%s': %s\n", topic, string(jsonBytes))

    // Deliver to each subscriber independently — one slow handler doesn't block others
    var wg sync.WaitGroup
    for _, handler := range handlers {
        wg.Add(1)
        // Each handler runs in a separate goroutine simulating async delivery
        // => production systems use worker pools or async event loops instead
        go func(h func(map[string]interface{})) {
            defer wg.Done()
            h(message)
            // => message delivered concurrently to each subscriber
        }(handler)
    }
    wg.Wait()
}

// Subscriber services — each receives its own copy of the event
func emailService(message map[string]interface{}) {
    // Receives order.created event and sends confirmation email
    // => processes independently; failure here doesn't affect inventory service
    orderID := message["order_id"].(string)
    email := message["customer_email"].(string)
    fmt.Printf("[EMAIL]     Sending confirmation for order %s to %s\n", orderID, email)
}

func inventoryService(message map[string]interface{}) {
    // Decrements stock for each item in the order
    // => processes independently; can retry without affecting email service
    items := message["items"].([]interface{})
    for _, item := range items {
        m := item.(map[string]interface{})
        fmt.Printf("[INVENTORY] Reserving %.0f x %s\n", m["qty"].(float64), m["sku"].(string))
    }
}

func analyticsService(message map[string]interface{}) {
    // Records the order event for reporting dashboards
    // => decoupled from order placement; dashboard latency doesn't affect checkout
    fmt.Printf("[ANALYTICS] Recorded order %s worth $%.2f\n",
        message["order_id"].(string), message["total"].(float64))
}

func main() {
    // Wire up the broker
    broker := NewPubSubBroker()
    // => broker.subscribers is {}

    broker.Subscribe("order.created", emailService)
    // => broker.subscribers["order.created"] = [emailService]

    broker.Subscribe("order.created", inventoryService)
    // => broker.subscribers["order.created"] = [emailService, inventoryService]

    broker.Subscribe("order.created", analyticsService)
    // => broker.subscribers["order.created"] = [emailService, inventoryService, analyticsService]

    // Producer publishes a single event — all three subscribers receive a copy
    orderEvent := map[string]interface{}{
        "order_id":       "ORD-7821",
        "customer_email": "alice@example.com",
        "items": []interface{}{
            map[string]interface{}{"sku": "WIDGET-A", "qty": float64(2)},
            map[string]interface{}{"sku": "GADGET-B", "qty": float64(1)},
        },
        "total": 149.99,
    }
    broker.Publish("order.created", orderEvent)
    // => Output (order may vary due to goroutines):
    // [BROKER] Publishing to 'order.created': {...}
    // [EMAIL]     Sending confirmation for order ORD-7821 to alice@example.com
    // [INVENTORY] Reserving 2 x WIDGET-A
    // [INVENTORY] Reserving 1 x GADGET-B
    // [ANALYTICS] Recorded order ORD-7821 worth $149.99

    time.Sleep(100 * time.Millisecond) // allow goroutines to finish
}

Key takeaway: Pub/sub decouples event producers from consumers so each subscriber receives an independent copy and fails independently — adding a new consumer requires zero changes to the producer.

Why It Matters: In monoliths, adding a new post-order action (e.g., fraud check) means modifying the checkout function. With pub/sub, the fraud service subscribes to order.created independently. This eliminates coupling, allows services to be deployed separately, and means a crashing analytics service never blocks a user’s checkout. Kafka, RabbitMQ, and Google Pub/Sub all implement this pattern at scale.


Example 30: Point-to-Point Queue (Work Queue)

In a point-to-point queue, each message is delivered to exactly one consumer. Multiple workers compete to process messages, enabling horizontal scaling of CPU-intensive tasks. This differs from pub/sub where every subscriber gets a copy — here only one worker wins each message.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// WorkQueue wraps Go's buffered channel with producer/consumer helpers
type WorkQueue struct {
    ch        chan map[string]string
    processed int
    mu        sync.Mutex
}

func NewWorkQueue(maxSize int) *WorkQueue {
    return &WorkQueue{
        // Buffered channel provides natural backpressure
        // => maxSize=20 means send blocks when 20 items are queued
        ch: make(chan map[string]string, maxSize),
    }
}

func (wq *WorkQueue) Enqueue(task map[string]string) {
    // Send blocks if channel is full — provides natural backpressure to producers
    // => producer slows down automatically when workers can't keep up
    wq.ch <- task
    fmt.Printf("[QUEUE] Enqueued task %s (qsize~=%d)\n", task["id"], len(wq.ch))
}

func (wq *WorkQueue) StartWorker(workerID int, wg *sync.WaitGroup) {
    // Each worker runs range on channel — exactly one worker receives each item
    // => competitive consumption: workers race to acquire each task
    go func() {
        defer wg.Done()
        for task := range wq.ch {
            // Simulate variable processing time (e.g., image resize, PDF render)
            duration := time.Duration(50+rand.Intn(100)) * time.Millisecond
            time.Sleep(duration)
            // => only one worker processes this task; others process different tasks

            wq.mu.Lock()
            wq.processed++
            count := wq.processed
            wq.mu.Unlock()
            fmt.Printf("[WORKER-%d] Processed task %s in %dms (total=%d)\n",
                workerID, task["id"], duration.Milliseconds(), count)
        }
    }()
}

func (wq *WorkQueue) Shutdown() {
    // Closing the channel signals all workers to exit when drained
    // => range loop exits when channel is closed and empty
    close(wq.ch)
}

func main() {
    numWorkers := 3
    wq := NewWorkQueue(20)
    // => wq.ch is empty, bounded at 20 items

    // Start three competing workers — they share the same channel
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        wq.StartWorker(i, &wg)
    }
    // => workers[0], workers[1], workers[2] all blocked on channel receive

    // Enqueue 9 tasks — distributed across 3 workers, ~3 tasks each
    for i := 0; i < 9; i++ {
        wq.Enqueue(map[string]string{
            "id":      fmt.Sprintf("TASK-%03d", i),
            "payload": fmt.Sprintf("data_%d", i),
        })
    }
    // => Output example (worker assignment varies):
    // [QUEUE] Enqueued task TASK-000 (qsize=1)
    // [WORKER-0] Processed task TASK-000 in 80ms (total=1)
    // [WORKER-1] Processed task TASK-001 in 120ms (total=2)
    // ... (each task processed by exactly one worker)

    wq.Shutdown()
    wg.Wait()
    // => all workers exit via channel close, Wait returns when all done
    fmt.Printf("[DONE] All tasks processed. Total: %d\n", wq.processed)
    // => Output: [DONE] All tasks processed. Total: 9
}

Key takeaway: Point-to-point queues distribute work across competing consumers so each task is processed exactly once — adding workers scales throughput linearly up to queue saturation.

Why It Matters: Image processing, email sending, and report generation are inherently parallel tasks. A single-threaded processor becomes a bottleneck. Work queues let you horizontally scale workers independently of the API layer — if a task backlog grows, deploy more workers. SQS, RabbitMQ (default mode), and Celery all implement this competing-consumer pattern.


Database Replication

Example 31: Master-Slave Replication

Master-slave (primary-replica) replication routes all writes to one primary node, which streams changes to read-only replicas. Reads scale horizontally across replicas while the primary handles all mutations. Replica lag — the delay between primary write and replica visibility — is a critical operational concern.

    graph TD
    App["Application"]:::blue
    Primary["Primary<br/>Read + Write"]:::orange
    R1["Replica 1<br/>Read Only"]:::teal
    R2["Replica 2<br/>Read Only"]:::teal

    App -->|"writes (INSERT/UPDATE/DELETE)"| Primary
    Primary -->|"async replication stream"| R1
    Primary -->|"async replication stream"| R2
    App -->|"reads (SELECT)"| R1
    App -->|"reads (SELECT)"| R2

    classDef blue fill:#0173B2,stroke:#000,color:#fff
    classDef orange fill:#DE8F05,stroke:#000,color:#fff
    classDef teal fill:#029E73,stroke:#000,color:#fff
  
package main

import (
    "fmt"
    "sync"
    "time"
)

type Record struct {
    ID      int
    Value   string
    Version int // monotonically increasing — detects replication order issues
}

// PrimaryNode: accepts writes and broadcasts to registered replicas
type PrimaryNode struct {
    nodeID   string
    store    map[int]*Record
    replicas []*ReplicaNode
    version  int
    mu       sync.Mutex
}

func NewPrimaryNode(nodeID string) *PrimaryNode {
    return &PrimaryNode{
        nodeID: nodeID,
        store:  make(map[int]*Record),
    }
}

func (p *PrimaryNode) RegisterReplica(r *ReplicaNode) {
    // Replicas register at startup; primary pushes changes to all of them
    p.replicas = append(p.replicas, r)
    fmt.Printf("[%s] Replica '%s' registered\n", p.nodeID, r.nodeID)
}

func (p *PrimaryNode) Write(recordID int, value string) *Record {
    p.mu.Lock()
    p.version++
    record := &Record{ID: recordID, Value: value, Version: p.version}
    p.store[recordID] = record
    // => write committed to primary synchronously
    p.mu.Unlock()

    fmt.Printf("[%s] WRITE id=%d value='%s' version=%d\n",
        p.nodeID, recordID, value, record.Version)

    // Replicate asynchronously — primary doesn't wait for replicas to confirm
    // => this is the source of replica lag; primary returns immediately
    for _, replica := range p.replicas {
        go replica.applyReplication(record)
    }

    return record
}

func (p *PrimaryNode) Read(recordID int) *Record {
    // Primary reads are always consistent — no lag possible
    p.mu.Lock()
    defer p.mu.Unlock()
    return p.store[recordID]
}

// ReplicaNode: read-only; applies replication events from primary
type ReplicaNode struct {
    nodeID string
    store  map[int]*Record
    lagMs  float64 // Simulated replication lag in milliseconds
    mu     sync.Mutex
}

func NewReplicaNode(nodeID string, lagMs float64) *ReplicaNode {
    return &ReplicaNode{
        nodeID: nodeID,
        store:  make(map[int]*Record),
        // real MySQL/PostgreSQL lag is 1-500ms typically
        // => high-traffic primaries can push lag to seconds under write storms
        lagMs: lagMs,
    }
}

func (r *ReplicaNode) applyReplication(record *Record) {
    // Simulate network + disk write delay on replica
    time.Sleep(time.Duration(r.lagMs) * time.Millisecond)
    r.mu.Lock()
    r.store[record.ID] = record
    r.mu.Unlock()
    fmt.Printf("[%s] REPLICATED id=%d version=%d (lag=%.0fms)\n",
        r.nodeID, record.ID, record.Version, r.lagMs)
}

func (r *ReplicaNode) Read(recordID int) *Record {
    // Reads from replica — may return stale data if replication hasn't caught up
    r.mu.Lock()
    record := r.store[recordID]
    r.mu.Unlock()
    staleness := "FRESH"
    if record == nil {
        staleness = "STALE/MISSING"
    }
    fmt.Printf("[%s] READ id=%d -> %s\n", r.nodeID, recordID, staleness)
    return record
}

func main() {
    // Setup: one primary, two replicas with different lag characteristics
    primary := NewPrimaryNode("primary")
    replica1 := NewReplicaNode("replica-1", 30)
    replica2 := NewReplicaNode("replica-2", 80)

    primary.RegisterReplica(replica1)
    primary.RegisterReplica(replica2)
    // => [primary] Replica 'replica-1' registered
    // => [primary] Replica 'replica-2' registered

    // Write to primary
    primary.Write(1, "hello")
    // => [primary] WRITE id=1 value='hello' version=1

    // Immediately read from replica — may be stale!
    time.Sleep(10 * time.Millisecond) // 10ms — replica-1 lag is 30ms, so both are still behind
    replica1.Read(1)
    // => [replica-1] READ id=1 -> STALE/MISSING (replication hasn't arrived yet)

    time.Sleep(100 * time.Millisecond) // wait 100ms — both replicas have now caught up
    replica1.Read(1)
    // => [replica-1] READ id=1 -> FRESH
    // => replica-1 returns Record{ID:1, Value:"hello", Version:1}
}

Key takeaway: Master-slave replication scales reads horizontally but introduces replica lag — reads immediately after writes may return stale data, requiring applications to read from the primary for consistency-critical operations.

Why It Matters: Most web applications are read-heavy (10:1 read-write ratio or higher). Routing reads to replicas multiplies read throughput proportional to replica count. However, replica lag creates “read-your-own-write” inconsistency: a user updates their profile and immediately sees the old value. Systems handle this by reading from the primary after a user’s own writes, or by waiting for replica confirmation (semi-synchronous replication).


Example 32: Multi-Master Replication and Conflict Resolution

Multi-master (active-active) replication allows writes to any node, with changes propagated to all peers. This enables geographic write distribution but introduces write conflicts when two nodes modify the same record concurrently. Conflict resolution strategies (last-write-wins, application-defined merge) determine which value survives.

package main

import (
    "fmt"
    "sync"
    "time"
)

type ConflictStrategy int

const (
    LastWriteWins   ConflictStrategy = iota // highest timestamp wins — simple, lossy
    HighestVersion                          // highest version counter wins
)

type VersionedRecord struct {
    ID        int
    Value     string
    Timestamp float64 // wall clock at write time — used for LWW
    NodeID    string  // which master originated this write
    Version   int     // logical clock increment per write
}

type MasterNode struct {
    nodeID   string
    strategy ConflictStrategy
    store    map[int]*VersionedRecord
    peers    []*MasterNode
    mu       sync.Mutex
}

func NewMasterNode(nodeID string, strategy ConflictStrategy) *MasterNode {
    return &MasterNode{
        nodeID:   nodeID,
        strategy: strategy,
        store:    make(map[int]*VersionedRecord),
    }
}

func (m *MasterNode) AddPeer(peer *MasterNode) {
    // Bidirectional peering: each master knows all other masters
    m.peers = append(m.peers, peer)
}

func (m *MasterNode) Write(recordID int, value string) *VersionedRecord {
    m.mu.Lock()
    current := m.store[recordID]
    newVersion := 1
    if current != nil {
        newVersion = current.Version + 1
    }
    record := &VersionedRecord{
        ID:        recordID,
        Value:     value,
        Timestamp: float64(time.Now().UnixNano()) / 1e9,
        NodeID:    m.nodeID,
        Version:   newVersion,
    }
    m.store[recordID] = record
    m.mu.Unlock()

    fmt.Printf("[%s] LOCAL WRITE id=%d value='%s' v=%d ts=%.4f\n",
        m.nodeID, recordID, value, record.Version, record.Timestamp)

    // Propagate to peers asynchronously — conflict possible if both wrote concurrently
    for _, peer := range m.peers {
        go peer.receiveReplication(record)
    }
    return record
}

func (m *MasterNode) receiveReplication(incoming *VersionedRecord) {
    // Simulate network delay — this is when conflicts become visible
    time.Sleep(50 * time.Millisecond)
    m.mu.Lock()
    defer m.mu.Unlock()

    existing := m.store[incoming.ID]
    if existing == nil {
        // No local record — apply unconditionally
        m.store[incoming.ID] = incoming
        fmt.Printf("[%s] REPLICATED id=%d from %s (no conflict)\n",
            m.nodeID, incoming.ID, incoming.NodeID)
        return
    }

    // CONFLICT: both nodes have a version of this record
    // => apply conflict resolution strategy
    winner := m.resolveConflict(existing, incoming)
    m.store[incoming.ID] = winner
    loserNode := incoming.NodeID
    if winner == incoming {
        loserNode = existing.NodeID
    }
    fmt.Printf("[%s] CONFLICT id=%d: winner='%s' from %s, DISCARDED write from %s\n",
        m.nodeID, incoming.ID, winner.Value, winner.NodeID, loserNode)
}

func (m *MasterNode) resolveConflict(local, incoming *VersionedRecord) *VersionedRecord {
    if m.strategy == LastWriteWins {
        // LWW: higher timestamp wins — assumes clocks are synchronized (NTP)
        // => clock skew can cause older writes to win; data loss is possible
        if local.Timestamp >= incoming.Timestamp {
            return local
        }
        return incoming
    }
    // Version-based: higher logical version wins
    // => version monotonicity only holds if both nodes started from same base
    if local.Version >= incoming.Version {
        return local
    }
    return incoming
}

func (m *MasterNode) Read(recordID int) *VersionedRecord {
    m.mu.Lock()
    defer m.mu.Unlock()
    return m.store[recordID]
}

func main() {
    // Two geographically distributed masters (e.g., US-East and EU-West)
    masterUS := NewMasterNode("master-us", LastWriteWins)
    masterEU := NewMasterNode("master-eu", LastWriteWins)
    masterUS.AddPeer(masterEU)
    masterEU.AddPeer(masterUS)

    // Concurrent writes to same record from different nodes — guaranteed conflict
    masterUS.Write(42, "US value")
    masterEU.Write(42, "EU value")
    // => [master-us] LOCAL WRITE id=42 value='US value' v=1 ts=...
    // => [master-eu] LOCAL WRITE id=42 value='EU value' v=1 ts=...

    time.Sleep(200 * time.Millisecond) // wait for replication to propagate
    // => [master-us] CONFLICT id=42: winner='EU value' from master-eu (or vice versa)
    // => Both masters converge to same value — eventual consistency achieved

    usResult := masterUS.Read(42)
    euResult := masterEU.Read(42)
    fmt.Printf("US reads: '%s', EU reads: '%s'\n", usResult.Value, euResult.Value)
    // => Both nodes converge to the same value (LWW picks one winner)
    // => Output: US reads: 'EU value', EU reads: 'EU value'  (or both 'US value')
}

Key takeaway: Multi-master replication enables geographic write distribution but requires explicit conflict resolution — last-write-wins is simple but lossy, while application-defined merges preserve data at the cost of complexity.

Why It Matters: Global applications (e.g., Google Docs, CRDTs in collaborative editing) need low-latency writes in every region. Routing all writes to a single master in US-East adds 200ms+ latency for EU users. Multi-master solves write latency but introduces conflicts. CassandraDB, DynamoDB, and CouchDB use this pattern with LWW or vector-clock conflict detection.


Database Sharding

Example 33: Range-Based Sharding

Range sharding partitions data by a sorted key range — shard 0 owns IDs 1-10000, shard 1 owns 10001-20000, etc. A routing layer maps each key to its shard. Range sharding enables efficient range scans but suffers from hotspots when traffic concentrates on one range.

package main

import (
    "fmt"
    "sort"
)

type Shard struct {
    ShardID    int
    RangeStart int // Inclusive lower bound of key range this shard owns
    RangeEnd   int // Inclusive upper bound of key range this shard owns
    store      map[int]map[string]string
}

func NewShard(shardID, rangeStart, rangeEnd int) *Shard {
    return &Shard{
        ShardID:    shardID,
        RangeStart: rangeStart,
        RangeEnd:   rangeEnd,
        store:      make(map[int]map[string]string),
    }
}

func (s *Shard) Insert(key int, value map[string]string) {
    s.store[key] = value
    fmt.Printf("[Shard-%d] INSERT key=%d range=[%d,%d]\n",
        s.ShardID, key, s.RangeStart, s.RangeEnd)
}

func (s *Shard) Get(key int) (map[string]string, bool) {
    v, ok := s.store[key]
    return v, ok
}

func (s *Shard) RangeScan(start, end int) []map[string]string {
    // Range scan is efficient: only keys in this shard's range are relevant
    // => no cross-shard communication needed for queries within one shard's range
    var results []map[string]string
    for k, v := range s.store {
        if k >= start && k <= end {
            results = append(results, v)
        }
    }
    return results
}

// RangeShardRouter: maps any key to its owning shard
type RangeShardRouter struct {
    shards      []*Shard
    breakpoints []int
}

func NewRangeShardRouter(shards []*Shard) *RangeShardRouter {
    // Shards must be sorted by RangeStart for binary search to work correctly
    sort.Slice(shards, func(i, j int) bool {
        return shards[i].RangeStart < shards[j].RangeStart
    })
    // Extract RangeStart values as sorted breakpoints for binary search
    // => sort.SearchInts(breakpoints, key) gives index for routing
    breakpoints := make([]int, len(shards))
    for i, s := range shards {
        breakpoints[i] = s.RangeStart
    }
    return &RangeShardRouter{shards: shards, breakpoints: breakpoints}
}

func (r *RangeShardRouter) Route(key int) *Shard {
    // Binary search returns insertion point — subtract 1 to find owning shard
    // => example: key=15000, breakpoints=[1,10001,20001], search=2 -> shard index 1
    idx := sort.SearchInts(r.breakpoints, key+1) - 1
    if idx < 0 {
        return nil // key is below first shard's RangeStart
    }
    shard := r.shards[idx]
    // Verify key is within this shard's upper bound (handles gaps in ranges)
    if key > shard.RangeEnd {
        return nil // key falls in an unassigned gap
    }
    return shard
}

func (r *RangeShardRouter) Insert(key int, value map[string]string) error {
    shard := r.Route(key)
    if shard == nil {
        return fmt.Errorf("no shard configured for key=%d", key)
    }
    shard.Insert(key, value)
    return nil
}

func (r *RangeShardRouter) Get(key int) (map[string]string, bool) {
    shard := r.Route(key)
    if shard == nil {
        return nil, false
    }
    return shard.Get(key)
}

func (r *RangeShardRouter) CrossShardRangeScan(start, end int) []map[string]string {
    // Cross-shard range scan: must query every shard whose range overlaps [start, end]
    // => this is why range sharding is good for range queries within a shard
    // => but cross-shard scans still require fan-out to multiple shards
    var results []map[string]string
    for _, shard := range r.shards {
        if shard.RangeStart <= end && shard.RangeEnd >= start {
            results = append(results, shard.RangeScan(start, end)...)
        }
    }
    return results
}

func main() {
    // Configure three shards covering user IDs 1 to 30000
    shards := []*Shard{
        NewShard(0, 1, 10000),
        NewShard(1, 10001, 20000),
        NewShard(2, 20001, 30000),
    }
    router := NewRangeShardRouter(shards)
    // => router.breakpoints = [1, 10001, 20001]

    // Insert records — router sends each to the correct shard
    router.Insert(500, map[string]string{"name": "Alice", "plan": "free"})
    // => [Shard-0] INSERT key=500 range=[1,10000]
    router.Insert(15000, map[string]string{"name": "Bob", "plan": "pro"})
    // => [Shard-1] INSERT key=15000 range=[10001,20000]
    router.Insert(25000, map[string]string{"name": "Charlie", "plan": "enterprise"})
    // => [Shard-2] INSERT key=25000 range=[20001,30000]

    // Point lookup — O(log n) routing via binary search, then O(1) store lookup
    record, _ := router.Get(15000)
    fmt.Printf("GET 15000: %v\n", record)
    // => GET 15000: map[name:Bob plan:pro]

    // Range scan crossing shard boundary — fan-out to shards 0 and 1
    results := router.CrossShardRangeScan(9000, 11000)
    fmt.Printf("Range scan [9000, 11000]: %d records\n", len(results))
    // => Range scan [9000, 11000]: 2 records (Alice from Shard-0, Bob from Shard-1)

    // Hotspot demonstration: all new users cluster at the top of the range
    // => shard-2 receives all traffic while shard-0 and shard-1 sit idle
    // => solution: hash sharding (Example 34) distributes writes uniformly
}

Key takeaway: Range sharding enables efficient range scans within a shard but creates hotspots when write patterns are skewed — adding all new records to one shard while others remain underloaded.

Why It Matters: Relational databases like MySQL and PostgreSQL can’t scale writes beyond one machine. Range sharding is the default strategy for time-series data (partition by month), order IDs (partition by range), and geographic keys (partition by region code). HBase and Bigtable use range-partitioned tablets. The hotspot problem drives adoption of hash sharding for write-heavy workloads.


Example 34: Hash-Based Sharding

Hash sharding applies a hash function to the partition key and uses modulo to select a shard. This distributes writes uniformly regardless of key ordering, eliminating hotspots. The trade-off is that range scans require querying all shards in parallel.

package main

import (
    "crypto/md5"
    "encoding/binary"
    "fmt"
)

type ShardNode struct {
    ShardID int
    store   map[string]map[string]interface{}
}

func NewShardNode(id int) *ShardNode {
    return &ShardNode{ShardID: id, store: make(map[string]map[string]interface{})}
}

func (s *ShardNode) Put(key string, value map[string]interface{}) {
    s.store[key] = value
}

func (s *ShardNode) Get(key string) (map[string]interface{}, bool) {
    v, ok := s.store[key]
    return v, ok
}

func (s *ShardNode) AllItems() []struct {
    Key   string
    Value map[string]interface{}
} {
    var items []struct {
        Key   string
        Value map[string]interface{}
    }
    for k, v := range s.store {
        items = append(items, struct {
            Key   string
            Value map[string]interface{}
        }{k, v})
    }
    return items
}

type HashShardRouter struct {
    numShards int
    shards    []*ShardNode
}

func NewHashShardRouter(numShards int) *HashShardRouter {
    shards := make([]*ShardNode, numShards)
    for i := 0; i < numShards; i++ {
        shards[i] = NewShardNode(i)
    }
    // => shards = [ShardNode(0), ShardNode(1), ShardNode(2), ShardNode(3)]
    return &HashShardRouter{numShards: numShards, shards: shards}
}

func (r *HashShardRouter) shardIndex(key string) int {
    // MD5 hash produces a 128-bit value — deterministic for same key
    // => same key ALWAYS routes to the same shard across all application instances
    hash := md5.Sum([]byte(key))
    // Convert first 4 bytes to unsigned int for modulo operation
    hashInt := binary.BigEndian.Uint32(hash[:4])
    // Modulo maps the hash space uniformly onto shard count
    // => for 4 shards, shard = hashInt % 4 — each shard gets ~25% of keys
    return int(hashInt) % r.numShards
}

func (r *HashShardRouter) Put(key string, value map[string]interface{}) {
    idx := r.shardIndex(key)
    r.shards[idx].Put(key, value)
    fmt.Printf("[ROUTER] PUT '%s' -> Shard-%d\n", key, idx)
}

func (r *HashShardRouter) Get(key string) (map[string]interface{}, bool) {
    idx := r.shardIndex(key)
    return r.shards[idx].Get(key)
}

func (r *HashShardRouter) ScatterGatherAll() []struct {
    Key   string
    Value map[string]interface{}
} {
    // Range queries require scatter-gather: query every shard, merge results
    // => O(numShards) network round trips vs O(1) for point lookups
    // => this is the primary cost of hash sharding for analytical queries
    var results []struct {
        Key   string
        Value map[string]interface{}
    }
    for _, shard := range r.shards {
        results = append(results, shard.AllItems()...)
    }
    return results
}

func (r *HashShardRouter) DistributionStats() map[int]int {
    // Show how evenly keys are distributed across shards
    // => uniform distribution confirms hash function is working correctly
    stats := make(map[int]int)
    for _, s := range r.shards {
        stats[s.ShardID] = len(s.store)
    }
    return stats
}

func main() {
    router := NewHashShardRouter(4)
    // => 4 shards, each will hold ~25% of keys

    // Insert user records — hash distributes them across shards
    users := []struct {
        key   string
        value map[string]interface{}
    }{
        {"user:alice", map[string]interface{}{"age": 30, "plan": "pro"}},
        {"user:bob", map[string]interface{}{"age": 25, "plan": "free"}},
        {"user:charlie", map[string]interface{}{"age": 35, "plan": "enterprise"}},
        {"user:diana", map[string]interface{}{"age": 28, "plan": "pro"}},
        {"user:eve", map[string]interface{}{"age": 22, "plan": "free"}},
        {"user:frank", map[string]interface{}{"age": 40, "plan": "enterprise"}},
        {"user:grace", map[string]interface{}{"age": 31, "plan": "pro"}},
        {"user:henry", map[string]interface{}{"age": 27, "plan": "free"}},
    }

    for _, u := range users {
        router.Put(u.key, u.value)
    }
    // => [ROUTER] PUT 'user:alice' -> Shard-X  (shard varies by hash)
    // => keys distribute roughly evenly — no single shard overwhelmed

    // Point lookup — O(1): compute hash, go directly to correct shard
    alice, _ := router.Get("user:alice")
    fmt.Printf("GET user:alice: %v\n", alice)
    // => GET user:alice: map[age:30 plan:pro]

    // Distribution report — verify even spread
    stats := router.DistributionStats()
    fmt.Printf("Shard distribution: %v\n", stats)
    // => Shard distribution: map[0:2 1:2 2:2 3:2]  (approximately even)

    // Scatter-gather: to find all "pro" users, must query all 4 shards
    allRecords := router.ScatterGatherAll()
    proCount := 0
    for _, r := range allRecords {
        if r.Value["plan"] == "pro" {
            proCount++
        }
    }
    fmt.Printf("Pro users (scatter-gather): %d\n", proCount)
    // => Pro users (scatter-gather): 3  (alice, diana, grace)
    // => required querying all 4 shards — expensive for large shard counts
}

Key takeaway: Hash sharding eliminates hotspots by distributing writes uniformly across shards, but makes range queries expensive because the data ordering is destroyed by hashing.

Why It Matters: Hash sharding is the default strategy in Cassandra (partition key hashing), Redis Cluster (CRC16 slot assignment), and DynamoDB (partition key hashing). When user IDs, order IDs, or session tokens are the access pattern, hash sharding ensures no single node becomes a bottleneck. The scatter-gather cost for analytics is typically acceptable because analytical queries run on separate read replicas or data warehouses.


Consistent Hashing

Example 35: Consistent Hashing Ring

Consistent hashing maps both nodes and keys onto a circular hash ring. Each key is assigned to the nearest node clockwise on the ring. When a node is added or removed, only the keys between the removed node and its predecessor need to be remapped — minimizing data movement compared to modulo hashing.

    graph TD
    Ring["Hash Ring<br/>0 to 2^32-1"]:::blue
    N1["Node A<br/>position=100"]:::teal
    N2["Node B<br/>position=200"]:::teal
    N3["Node C<br/>position=300"]:::teal
    K1["Key 'user:1'<br/>hash=150 -> Node B"]:::orange
    K2["Key 'user:2'<br/>hash=250 -> Node C"]:::orange
    K3["Key 'user:3'<br/>hash=50  -> Node A"]:::orange

    Ring --> N1
    Ring --> N2
    Ring --> N3
    N1 --> K3
    N2 --> K1
    N3 --> K2

    classDef blue fill:#0173B2,stroke:#000,color:#fff
    classDef orange fill:#DE8F05,stroke:#000,color:#fff
    classDef teal fill:#029E73,stroke:#000,color:#fff
  
package main

import (
    "crypto/sha256"
    "encoding/binary"
    "fmt"
    "sort"
)

type Node struct {
    NodeID  string
    Address string // e.g., "cache-1.internal:6379"
}

type ConsistentHashRing struct {
    virtualNodes   int
    ringPositions  []uint64            // sorted list of hash positions on the ring
    positionToNode map[uint64]*Node    // maps ring position -> Node
}

func NewConsistentHashRing(virtualNodes int) *ConsistentHashRing {
    return &ConsistentHashRing{
        // virtualNodes: each physical node gets N positions on the ring
        // => more virtual nodes = more uniform distribution across fewer physical nodes
        // => 100-200 virtual nodes per physical node is typical (Cassandra default: 256)
        virtualNodes:   virtualNodes,
        positionToNode: make(map[uint64]*Node),
    }
}

func (r *ConsistentHashRing) hash(key string) uint64 {
    // SHA-256 gives 256-bit hash; take first 8 bytes for a 64-bit ring
    // => consistent: same input always produces same hash across all instances
    digest := sha256.Sum256([]byte(key))
    return binary.BigEndian.Uint64(digest[:8])
}

func (r *ConsistentHashRing) AddNode(node *Node) {
    // Place N virtual nodes on the ring for this physical node
    for i := 0; i < r.virtualNodes; i++ {
        // Virtual node key: "nodeID:replica_N" — each has a unique ring position
        virtualKey := fmt.Sprintf("%s:replica_%d", node.NodeID, i)
        position := r.hash(virtualKey)
        r.ringPositions = append(r.ringPositions, position)
        r.positionToNode[position] = node
    }
    // Sort to maintain sorted order for binary search
    sort.Slice(r.ringPositions, func(i, j int) bool {
        return r.ringPositions[i] < r.ringPositions[j]
    })
    fmt.Printf("[RING] Added node '%s' with %d virtual nodes\n", node.NodeID, r.virtualNodes)
}

func (r *ConsistentHashRing) RemoveNode(node *Node) {
    // Remove all virtual nodes for this physical node
    for i := 0; i < r.virtualNodes; i++ {
        virtualKey := fmt.Sprintf("%s:replica_%d", node.NodeID, i)
        position := r.hash(virtualKey)
        // Only keys between removed node and its predecessor need remapping
        // => O(k/N) keys remapped, where k=total keys, N=node count
        delete(r.positionToNode, position)
        idx := sort.Search(len(r.ringPositions), func(j int) bool {
            return r.ringPositions[j] >= position
        })
        if idx < len(r.ringPositions) && r.ringPositions[idx] == position {
            r.ringPositions = append(r.ringPositions[:idx], r.ringPositions[idx+1:]...)
        }
    }
    fmt.Printf("[RING] Removed node '%s'\n", node.NodeID)
}

func (r *ConsistentHashRing) GetNode(key string) *Node {
    if len(r.ringPositions) == 0 {
        panic("Ring has no nodes")
    }
    keyHash := r.hash(key)
    // Find the first ring position >= keyHash (clockwise lookup)
    idx := sort.Search(len(r.ringPositions), func(i int) bool {
        return r.ringPositions[i] >= keyHash
    })
    // Wrap around: if keyHash is past the last position, use position 0
    // => this is what makes it a "ring" — the last node wraps to serve before the first
    if idx == len(r.ringPositions) {
        idx = 0
    }
    position := r.ringPositions[idx]
    return r.positionToNode[position]
}

func main() {
    // Build a cache cluster with 3 nodes
    ring := NewConsistentHashRing(50)
    nodeA := &Node{"cache-a", "10.0.0.1:6379"}
    nodeB := &Node{"cache-b", "10.0.0.2:6379"}
    nodeC := &Node{"cache-c", "10.0.0.3:6379"}

    ring.AddNode(nodeA)
    ring.AddNode(nodeB)
    ring.AddNode(nodeC)
    // => 150 total positions on ring (50 virtual * 3 physical)

    // Route several cache keys to their owning nodes
    keys := []string{"user:alice", "user:bob", "session:xyz", "product:123", "cart:456"}
    assignments := make(map[string]string)
    for _, key := range keys {
        node := ring.GetNode(key)
        assignments[key] = node.NodeID
        fmt.Printf("[RING] '%s' -> %s (%s)\n", key, node.NodeID, node.Address)
    }

    // Simulate adding a 4th node (scale-out event)
    nodeD := &Node{"cache-d", "10.0.0.4:6379"}
    ring.AddNode(nodeD)
    // => [RING] Added node 'cache-d' with 50 virtual nodes

    // Check which keys changed assignment — only ~25% should move (1/4 of keys)
    fmt.Println("\n[RING] Key reassignments after adding cache-d:")
    moved := 0
    for _, key := range keys {
        newNode := ring.GetNode(key)
        if assignments[key] != newNode.NodeID {
            moved++
            fmt.Printf("  MOVED '%s': %s -> %s\n", key, assignments[key], newNode.NodeID)
        }
    }
    fmt.Printf("  %d/%d keys remapped (~%.0f%%, expected ~25%%)\n",
        moved, len(keys), float64(moved)/float64(len(keys))*100)
    // => Only ~25% of keys remapped — consistent hashing's key advantage
}

Key takeaway: Consistent hashing remaps only 1/N of keys when a node is added or removed, versus modulo hashing which remaps nearly all keys — making it essential for cache clusters where remapping means cache misses.

Why It Matters: Memcached and Redis Cluster use consistent hashing to minimize cache invalidation during node scaling events. Without it, adding one cache server causes ~50% of all cache keys to miss simultaneously (thundering herd). Amazon DynamoDB, Cassandra, and Riak all use consistent hashing variants with virtual nodes for even load distribution across heterogeneous hardware.


Rate Limiting

Example 36: Token Bucket Rate Limiter

The token bucket algorithm maintains a bucket that fills with tokens at a fixed rate. Each request consumes one token. If the bucket is empty, the request is rejected or queued. The bucket has a maximum capacity, allowing short bursts above the sustained rate — a key distinction from fixed-window rate limiting.

    graph LR
    Req["Incoming<br/>Requests"]:::blue
    Bucket["Token Bucket<br/>capacity=10<br/>rate=5/sec"]:::orange
    Allow["Allowed<br/>Requests"]:::teal
    Deny["Rejected<br/>Requests #40;429#41;"]:::purple

    Req -->|consume token| Bucket
    Bucket -->|token available| Allow
    Bucket -->|bucket empty| Deny

    classDef blue fill:#0173B2,stroke:#000,color:#fff
    classDef orange fill:#DE8F05,stroke:#000,color:#fff
    classDef teal fill:#029E73,stroke:#000,color:#fff
    classDef purple fill:#CC78BC,stroke:#000,color:#fff
  
package main

import (
    "fmt"
    "sync"
    "time"
)

type TokenBucketRateLimiter struct {
    capacity   float64
    refillRate float64
    tokens     float64
    lastRefill time.Time
    mu         sync.Mutex
}

func NewTokenBucketRateLimiter(capacity, refillRate float64) *TokenBucketRateLimiter {
    return &TokenBucketRateLimiter{
        // capacity: maximum tokens the bucket can hold (burst limit)
        // => capacity=10 means a user can send 10 requests instantaneously after idle
        capacity: capacity,
        // refillRate: tokens added per second (sustained rate limit)
        // => refillRate=5 means 5 requests/second sustained throughput
        refillRate: refillRate,
        // Start with full bucket — user gets burst allowance from first request
        tokens:     capacity,
        lastRefill: time.Now(),
    }
}

func (t *TokenBucketRateLimiter) refill() {
    now := time.Now()
    elapsed := now.Sub(t.lastRefill).Seconds()
    // Add tokens proportional to elapsed time
    // => 0.5s elapsed at 5 tokens/sec = 2.5 new tokens
    newTokens := elapsed * t.refillRate
    // min() enforces capacity ceiling — tokens never exceed bucket size
    t.tokens += newTokens
    if t.tokens > t.capacity {
        t.tokens = t.capacity
    }
    t.lastRefill = now
}

func (t *TokenBucketRateLimiter) Allow(tokensNeeded float64) bool {
    t.mu.Lock()
    defer t.mu.Unlock()
    // Refill before checking — lazy evaluation avoids background threads
    // => refill happens on each request, not on a timer (more efficient)
    t.refill()
    if t.tokens >= tokensNeeded {
        // Sufficient tokens: consume and allow request
        t.tokens -= tokensNeeded
        // => t.tokens decremented by tokensNeeded
        return true
    }
    // Insufficient tokens: reject with 429 Too Many Requests
    return false
}

func (t *TokenBucketRateLimiter) TokenCount() float64 {
    t.mu.Lock()
    defer t.mu.Unlock()
    t.refill()
    return t.tokens
}

func main() {
    // Rate limiter: 10-token burst, 5 tokens/sec sustained
    limiter := NewTokenBucketRateLimiter(10, 5)
    // => limiter.tokens = 10.0 (full bucket at startup)

    // Burst phase: 10 rapid requests should all succeed (consume the full bucket)
    fmt.Println("=== Burst phase (10 rapid requests) ===")
    for i := 0; i < 12; i++ {
        result := limiter.Allow(1.0)
        status := "ALLOWED"
        if !result {
            status = "REJECTED"
        }
        fmt.Printf("  Request %02d: %s (tokens remaining: %.2f)\n",
            i+1, status, limiter.TokenCount())
    }
    // => Requests 1-10: ALLOWED (bucket drains from 10 to 0)
    // => Requests 11-12: REJECTED (bucket empty, 429)

    // Wait for refill: 0.4s at 5 tokens/sec = 2 new tokens
    fmt.Println("\n=== After 0.4s wait (5 tokens/sec * 0.4s = 2 tokens) ===")
    time.Sleep(400 * time.Millisecond)
    for i := 0; i < 3; i++ {
        result := limiter.Allow(1.0)
        status := "ALLOWED"
        if !result {
            status = "REJECTED"
        }
        fmt.Printf("  Request %d: %s (tokens: %.2f)\n",
            i+1, status, limiter.TokenCount())
    }
    // => Request 1: ALLOWED  (2 tokens -> 1 token)
    // => Request 2: ALLOWED  (1 token -> 0 tokens)
    // => Request 3: REJECTED (0 tokens, 0.4s hasn't refilled enough for a 3rd)
}

Key takeaway: Token bucket allows short bursts up to the bucket capacity while enforcing a sustained rate — the bucket absorbs traffic spikes without rejecting legitimate bursty clients.

Why It Matters: Fixed-window rate limiting (e.g., 100 requests/minute) allows 200 requests in two seconds straddling a window boundary. Token bucket prevents this by continuously tracking consumption. Stripe, AWS API Gateway, and Nginx all use token bucket variants. The burst capacity parameter is critical — too small and legitimate users are rejected during normal traffic spikes; too large and the rate limit loses meaning.


Example 37: Sliding Window Rate Limiter

The sliding window algorithm maintains a log of request timestamps and counts requests in a rolling time window. Unlike fixed windows, it has no boundary burst problem — the window always represents the most recent N seconds of activity regardless of clock alignment.

package main

import (
    "fmt"
    "sync"
    "time"
)

type SlidingWindowRateLimiter struct {
    maxRequests   int
    windowSeconds float64
    requestTimes  []float64
    mu            sync.Mutex
}

func NewSlidingWindowRateLimiter(maxRequests int, windowSeconds float64) *SlidingWindowRateLimiter {
    return &SlidingWindowRateLimiter{
        // maxRequests: maximum allowed requests within any windowSeconds interval
        // => windowSeconds=60, maxRequests=100 means: at most 100 req in any 60s window
        maxRequests:   maxRequests,
        windowSeconds: windowSeconds,
    }
}

func (s *SlidingWindowRateLimiter) Allow() (bool, map[string]interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()

    now := float64(time.Now().UnixNano()) / 1e9
    // Evict timestamps older than the sliding window
    // => ensures slice always reflects only the last windowSeconds
    windowStart := now - s.windowSeconds
    filtered := s.requestTimes[:0]
    for _, t := range s.requestTimes {
        if t >= windowStart {
            filtered = append(filtered, t)
        }
    }
    s.requestTimes = filtered

    currentCount := len(s.requestTimes)
    remaining := s.maxRequests - currentCount

    if currentCount < s.maxRequests {
        // Under limit: record this request's timestamp
        s.requestTimes = append(s.requestTimes, now)
        // => slice grows by 1; will be evicted after windowSeconds
        return true, map[string]interface{}{
            "allowed":   true,
            "count":     currentCount + 1,
            "remaining": remaining - 1,
            "reset_in":  s.windowSeconds,
        }
    }
    // At limit: calculate when the oldest request will expire
    // => tells caller when to retry — important for retry-after headers
    oldest := s.requestTimes[0]
    retryAfter := (oldest + s.windowSeconds) - now
    return false, map[string]interface{}{
        "allowed":             false,
        "count":               currentCount,
        "remaining":           0,
        "retry_after_seconds": retryAfter,
    }
}

func main() {
    // Limiter: 5 requests per 10-second sliding window
    limiter := NewSlidingWindowRateLimiter(5, 10)

    fmt.Println("=== Phase 1: Fill the window ===")
    for i := 0; i < 7; i++ {
        allowed, info := limiter.Allow()
        status := "OK "
        if !allowed {
            status = "429"
        }
        fmt.Printf("  Request %d: %s | count=%v | remaining=%v\n",
            i+1, status, info["count"], info["remaining"])
        time.Sleep(100 * time.Millisecond) // 100ms between requests
    }
    // => Requests 1-5: OK  (window fills to capacity)
    // => Requests 6-7: 429 (window at limit)

    fmt.Println("\n=== Phase 2: Wait for window to slide ===")
    time.Sleep(1500 * time.Millisecond) // 1.5s: oldest requests have NOT expired (window=10s)
    allowed, _ := limiter.Allow()
    status := "OK"
    if !allowed {
        status = "429"
    }
    fmt.Printf("  After 1.5s: %s — oldest requests still within 10s window\n", status)
    // => Still 429 — sliding window correctly rejects

    // Contrast with fixed window: requests clustered at the end of minute 0
    // and start of minute 1 would all pass fixed-window but are caught by sliding window
    fmt.Println("\n=== Demonstrating sliding window advantage ===")
    limiter2 := NewSlidingWindowRateLimiter(5, 10)
    for i := 0; i < 5; i++ {
        limiter2.Allow() // fill the first 5
    }
    allowed2, _ := limiter2.Allow()
    result := "BLOCKED (correct)"
    if allowed2 {
        result = "BYPASSED (bad)"
    }
    fmt.Printf("  Bypass attempt: %s\n", result)
    // => BLOCKED (correct) — sliding window has no boundary vulnerability
}

Key takeaway: Sliding window rate limiting eliminates the fixed-window boundary burst problem by always measuring the most recent window_seconds of activity relative to each request.

Why It Matters: Fixed windows allow 2x the intended rate at window boundaries — a known attack vector for APIs. Sliding windows prevent this at the cost of O(max_requests) memory per user (storing timestamps). GitHub’s API uses sliding windows. At scale, Redis sorted sets (ZRANGEBYSCORE + ZADD + ZREMRANGEBYSCORE) implement sliding windows efficiently with O(log N) operations per request.


Circuit Breaker Pattern

Example 38: Circuit Breaker with Three States

The circuit breaker pattern wraps external calls and tracks failures. After a failure threshold is reached, the circuit “opens” and rejects requests immediately without attempting the call — protecting the downstream service from overload. After a timeout, it enters “half-open” state to probe recovery.

    graph LR
    Closed["CLOSED<br/>Normal operation"]:::teal
    Open["OPEN<br/>Fast fail #40;reject all#41;"]:::purple
    HalfOpen["HALF-OPEN<br/>Probe recovery"]:::orange

    Closed -->|"failures >= threshold"| Open
    Open -->|"timeout elapsed"| HalfOpen
    HalfOpen -->|"probe succeeds"| Closed
    HalfOpen -->|"probe fails"| Open

    classDef teal fill:#029E73,stroke:#000,color:#fff
    classDef purple fill:#CC78BC,stroke:#000,color:#fff
    classDef orange fill:#DE8F05,stroke:#000,color:#fff
  
package main

import (
    "fmt"
    "sync"
    "time"
)

type CircuitState int

const (
    Closed   CircuitState = iota // normal: requests pass through to downstream
    Open                         // tripped: requests fail fast, downstream is protected
    HalfOpen                     // probing: one request allowed to test recovery
)

func (s CircuitState) String() string {
    switch s {
    case Closed:
        return "CLOSED"
    case Open:
        return "OPEN"
    case HalfOpen:
        return "HALF_OPEN"
    }
    return "UNKNOWN"
}

type CircuitBreakerOpenError struct {
    Message string
}

func (e *CircuitBreakerOpenError) Error() string { return e.Message }

type CircuitBreaker struct {
    failureThreshold  int
    recoveryTimeout   time.Duration
    successThreshold  int
    state             CircuitState
    failureCount      int
    successCount      int
    openedAt          time.Time
    mu                sync.Mutex
}

func NewCircuitBreaker(failureThreshold int, recoveryTimeout time.Duration, successThreshold int) *CircuitBreaker {
    return &CircuitBreaker{
        // failureThreshold: consecutive failures before tripping to OPEN
        // => 5 failures means a transient spike doesn't trip the circuit
        failureThreshold: failureThreshold,
        // recoveryTimeout: duration to wait in OPEN before trying HALF_OPEN
        // => gives downstream service time to restart/recover
        recoveryTimeout: recoveryTimeout,
        // successThreshold: consecutive successes in HALF_OPEN before closing
        // => 2 successes confirms recovery is stable, not a fluke
        successThreshold: successThreshold,
        state:            Closed,
    }
}

func (cb *CircuitBreaker) State() CircuitState {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    return cb.state
}

func (cb *CircuitBreaker) Call(fn func() (interface{}, error)) (interface{}, error) {
    cb.mu.Lock()
    if cb.state == Open {
        elapsed := time.Since(cb.openedAt)
        if elapsed >= cb.recoveryTimeout {
            // Timeout expired: transition to HALF_OPEN to probe recovery
            cb.state = HalfOpen
            cb.successCount = 0
            fmt.Printf("[CB] OPEN -> HALF_OPEN (timeout elapsed: %.1fs)\n", elapsed.Seconds())
        } else {
            // Still within timeout: fast-fail without calling downstream
            // => protects downstream from receiving requests during recovery
            remaining := cb.recoveryTimeout - elapsed
            cb.mu.Unlock()
            return nil, &CircuitBreakerOpenError{
                Message: fmt.Sprintf("Circuit OPEN. Retry in %.1fs", remaining.Seconds()),
            }
        }
    }
    cb.mu.Unlock()

    // Attempt the actual call (outside lock to avoid holding lock during I/O)
    result, err := fn()
    if err != nil {
        cb.onFailure()
        return nil, err
    }
    cb.onSuccess()
    return result, nil
}

func (cb *CircuitBreaker) onSuccess() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    if cb.state == HalfOpen {
        cb.successCount++
        fmt.Printf("[CB] HALF_OPEN success %d/%d\n", cb.successCount, cb.successThreshold)
        if cb.successCount >= cb.successThreshold {
            // Enough consecutive successes: close the circuit — normal operation resumes
            cb.state = Closed
            cb.failureCount = 0
            fmt.Println("[CB] HALF_OPEN -> CLOSED (service recovered)")
        }
    } else if cb.state == Closed {
        // Successful call resets consecutive failure counter
        cb.failureCount = 0
    }
}

func (cb *CircuitBreaker) onFailure() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    cb.failureCount++
    fmt.Printf("[CB] Failure %d/%d\n", cb.failureCount, cb.failureThreshold)
    if cb.state == HalfOpen {
        // Probe failed: recovery not complete, re-open immediately
        cb.state = Open
        cb.openedAt = time.Now()
        fmt.Println("[CB] HALF_OPEN -> OPEN (probe failed)")
    } else if cb.failureCount >= cb.failureThreshold {
        // Threshold reached: trip to OPEN
        cb.state = Open
        cb.openedAt = time.Now()
        fmt.Println("[CB] CLOSED -> OPEN (failure threshold reached)")
    }
}

func main() {
    // Simulate a payment service that fails temporarily
    callCount := 0
    paymentService := func() (interface{}, error) {
        callCount++
        n := callCount
        // Fail for calls 1-6, succeed after
        if n <= 6 {
            return nil, fmt.Errorf("Payment gateway timeout (call #%d)", n)
        }
        return map[string]interface{}{
            "status": "success", "amount": 99.99,
            "txn_id": fmt.Sprintf("TXN-%d", n),
        }, nil
    }

    cb := NewCircuitBreaker(3, 500*time.Millisecond, 2)
    // => cb.state = CLOSED, cb.failureCount = 0

    // Phase 1: failures trip the circuit
    for i := 0; i < 4; i++ {
        result, err := cb.Call(paymentService)
        if err != nil {
            if _, ok := err.(*CircuitBreakerOpenError); ok {
                fmt.Printf("  [FAST FAIL] %s\n", err)
            } else {
                fmt.Printf("  [ERROR] %s | state=%s\n", err, cb.State())
            }
        } else {
            fmt.Printf("  Payment OK: %v\n", result)
        }
    }
    // => Calls 1-3: error (failureCount hits threshold=3)
    // => Call 4: FAST FAIL (circuit OPEN — paymentService never called)

    time.Sleep(600 * time.Millisecond) // wait for recoveryTimeout=0.5s

    // Phase 2: probe recovery
    callCount = 6 // next call will succeed
    for i := 0; i < 3; i++ {
        result, err := cb.Call(paymentService)
        if err != nil {
            fmt.Printf("  [ERROR] %s | state=%s\n", err, cb.State())
        } else {
            fmt.Printf("  Payment OK: %v | state=%s\n", result, cb.State())
        }
    }
    // => Call 1 in HALF_OPEN: success (successCount=1)
    // => Call 2 in HALF_OPEN: success (successCount=2 >= threshold=2) -> CLOSED
    // => Call 3 in CLOSED: success (normal operation)
}

Key takeaway: The circuit breaker prevents cascading failures by fast-failing requests when a downstream service is unhealthy, giving it time to recover without being overwhelmed by retries.

Why It Matters: Without circuit breakers, a slow payment gateway causes request queues to back up, threads to exhaust, and the entire checkout service to fail. Netflix’s Hystrix popularized this pattern. Modern equivalents include Resilience4j (Java), Polly (.NET), and built-in support in service meshes like Istio. The half-open state is critical — without it, the circuit stays open forever or re-opens immediately on the first retry.


API Gateway

Example 39: API Gateway with Request Routing and Authentication

An API gateway is a single entry point for all client requests that handles cross-cutting concerns — authentication, rate limiting, routing, and request transformation — so individual microservices don’t need to implement them. It decouples clients from service topology changes.

package main

import (
    "fmt"
    "regexp"
    "strings"
    "time"
)

type Request struct {
    Method  string
    Path    string
    Headers map[string]string
    Body    map[string]interface{}
}

type Response struct {
    Status int
    Body   map[string]interface{}
}

type MiddlewareFn func(*Request, func(*Request) *Response) *Response

type APIGateway struct {
    routes      []struct {
        method  string
        pattern string
        handler func(*Request, map[string]string) *Response
    }
    middleware   []MiddlewareFn
    validTokens map[string]string
}

func NewAPIGateway() *APIGateway {
    return &APIGateway{
        // Simulated JWT token store: token -> user_id
        validTokens: map[string]string{
            "token-alice": "user-alice",
            "token-admin": "user-admin",
        },
    }
}

func (g *APIGateway) Use(mw MiddlewareFn) {
    // Register middleware in the pipeline — order matters
    // => authentication before rate limiting before routing
    g.middleware = append(g.middleware, mw)
}

func (g *APIGateway) Route(method, pattern string, handler func(*Request, map[string]string) *Response) {
    // Register a route: HTTP method + URL pattern (supports :param style) + handler
    g.routes = append(g.routes, struct {
        method  string
        pattern string
        handler func(*Request, map[string]string) *Response
    }{method, pattern, handler})
}

func (g *APIGateway) findHandler(req *Request) (func(*Request, map[string]string) *Response, map[string]string) {
    for _, route := range g.routes {
        if route.method != req.Method {
            continue
        }
        // Convert :param segments to named regex groups
        // => /users/:id becomes /users/(?P<id>[^/]+)
        regexStr := regexp.MustCompile(`:(\w+)`).ReplaceAllString(route.pattern, `(?P<$1>[^/]+)`)
        re := regexp.MustCompile("^" + regexStr + "$")
        match := re.FindStringSubmatch(req.Path)
        if match != nil {
            params := make(map[string]string)
            for i, name := range re.SubexpNames() {
                if i > 0 && name != "" {
                    params[name] = match[i]
                }
            }
            return route.handler, params
        }
    }
    return nil, nil
}

func (g *APIGateway) Handle(req *Request) *Response {
    // Build middleware chain from innermost (routing) outward
    routingHandler := func(r *Request) *Response {
        handler, params := g.findHandler(r)
        if handler == nil {
            return &Response{404, map[string]interface{}{"error": "Not Found", "path": r.Path}}
        }
        return handler(r, params)
    }

    // Compose middleware stack: each wraps the next
    chain := routingHandler
    for i := len(g.middleware) - 1; i >= 0; i-- {
        mw := g.middleware[i]
        next := chain
        chain = func(r *Request) *Response {
            return mw(r, next)
        }
    }

    return chain(req)
}

// --- Middleware implementations ---

var gateway *APIGateway // package-level for middleware closure access

func authMiddleware(req *Request, next func(*Request) *Response) *Response {
    // Extract Bearer token from Authorization header
    authHeader := req.Headers["Authorization"]
    if !strings.HasPrefix(authHeader, "Bearer ") {
        return &Response{401, map[string]interface{}{"error": "Missing or invalid Authorization header"}}
    }
    token := authHeader[7:] // strip "Bearer " prefix
    // Validate token against token store (production: verify JWT signature)
    userID, ok := gateway.validTokens[token]
    if !ok {
        return &Response{401, map[string]interface{}{"error": "Invalid token"}}
    }
    // Attach user_id to headers so downstream handlers can use it
    req.Headers["X-User-Id"] = userID
    return next(req)
}

// Simple in-memory rate limiter per IP
var rateLimitStore = map[string][]float64{}

func rateLimitMiddleware(req *Request, next func(*Request) *Response) *Response {
    clientIP := req.Headers["X-Forwarded-For"]
    if clientIP == "" {
        clientIP = "127.0.0.1"
    }
    now := float64(time.Now().UnixNano()) / 1e9
    window := 1.0 // 1-second sliding window
    limit := 10   // 10 requests per second per IP
    // Evict timestamps outside the window
    var filtered []float64
    for _, t := range rateLimitStore[clientIP] {
        if now-t < window {
            filtered = append(filtered, t)
        }
    }
    rateLimitStore[clientIP] = filtered
    if len(rateLimitStore[clientIP]) >= limit {
        return &Response{429, map[string]interface{}{"error": "Rate limit exceeded"}}
    }
    rateLimitStore[clientIP] = append(rateLimitStore[clientIP], now)
    return next(req)
}

func loggingMiddleware(req *Request, next func(*Request) *Response) *Response {
    start := time.Now()
    resp := next(req)
    elapsed := time.Since(start)
    fmt.Printf("[GATEWAY] %s %s -> %d (%.2fms)\n",
        req.Method, req.Path, resp.Status, float64(elapsed.Microseconds())/1000)
    return resp
}

// --- Service handlers ---

func getUser(req *Request, params map[string]string) *Response {
    userID := params["id"]
    requester := req.Headers["X-User-Id"]
    // => X-User-Id was injected by authMiddleware — service trusts the gateway
    return &Response{200, map[string]interface{}{
        "user_id": userID, "requested_by": requester, "status": "active",
    }}
}

func listProducts(req *Request, params map[string]string) *Response {
    return &Response{200, map[string]interface{}{
        "products": []string{"widget-a", "gadget-b"}, "total": 2,
    }}
}

func main() {
    // Assemble the gateway
    gateway = NewAPIGateway()
    gateway.Use(loggingMiddleware)      // outermost: logs all requests including auth failures
    gateway.Use(authMiddleware)         // second: authenticates before rate limiting
    gateway.Use(rateLimitMiddleware)    // innermost middleware: rate limit authenticated users

    gateway.Route("GET", "/api/users/:id", getUser)
    gateway.Route("GET", "/api/products", listProducts)

    // Authenticated request
    req1 := &Request{"GET", "/api/users/42",
        map[string]string{"Authorization": "Bearer token-alice", "X-Forwarded-For": "1.2.3.4"}, nil}
    resp1 := gateway.Handle(req1)
    fmt.Printf("Response: %d %v\n", resp1.Status, resp1.Body)
    // => [GATEWAY] GET /api/users/42 -> 200 (Xms)
    // => Response: 200 map[requested_by:user-alice status:active user_id:42]

    // Unauthenticated request
    req2 := &Request{"GET", "/api/products",
        map[string]string{"X-Forwarded-For": "1.2.3.5"}, nil}
    resp2 := gateway.Handle(req2)
    fmt.Printf("Response: %d %v\n", resp2.Status, resp2.Body)
    // => [GATEWAY] GET /api/products -> 401 (Xms)
    // => Response: 401 map[error:Missing or invalid Authorization header]
}

Key takeaway: An API gateway centralizes cross-cutting concerns (auth, rate limiting, logging) so each microservice implements only its business logic — changes to auth policy update one place, not every service.

Why It Matters: Without an API gateway, every microservice must implement authentication, rate limiting, and logging independently. As the service count grows to dozens, this becomes unmanageable. AWS API Gateway, Kong, and Nginx serve this role in production. Service meshes (Istio, Linkerd) push this further — handling auth and observability at the infrastructure layer via sidecar proxies.


Service Discovery

Example 40: Service Registry and Client-Side Discovery

Service discovery solves the problem of services finding each other in dynamic environments where IP addresses change on restarts. A service registry (like Consul or Eureka) stores service locations. Client-side discovery means the client queries the registry and load-balances itself — no proxy in the request path.

package main

import (
    "fmt"
    "sync"
    "time"
)

type ServiceInstance struct {
    InstanceID  string
    ServiceName string
    Host        string
    Port        int
    RegisteredAt time.Time
    Metadata    map[string]string
}

func (s *ServiceInstance) Address() string {
    return fmt.Sprintf("%s:%d", s.Host, s.Port)
}

type ServiceRegistry struct {
    ttl       time.Duration
    instances map[string]map[string]*ServiceInstance
    // => {serviceName: {instanceID: ServiceInstance}}
    mu sync.Mutex
}

func NewServiceRegistry(ttl time.Duration) *ServiceRegistry {
    return &ServiceRegistry{
        // ttl: instances must re-register within this window or be evicted
        // => prevents stale instances from accumulating after crashes
        ttl:       ttl,
        instances: make(map[string]map[string]*ServiceInstance),
    }
}

func (r *ServiceRegistry) Register(inst *ServiceInstance) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if r.instances[inst.ServiceName] == nil {
        r.instances[inst.ServiceName] = make(map[string]*ServiceInstance)
    }
    inst.RegisteredAt = time.Now()
    r.instances[inst.ServiceName][inst.InstanceID] = inst
    fmt.Printf("[REGISTRY] Registered %s/%s at %s\n",
        inst.ServiceName, inst.InstanceID, inst.Address())
}

func (r *ServiceRegistry) Deregister(serviceName, instanceID string) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if svc, ok := r.instances[serviceName]; ok {
        delete(svc, instanceID)
        fmt.Printf("[REGISTRY] Deregistered %s/%s\n", serviceName, instanceID)
    }
}

func (r *ServiceRegistry) Heartbeat(serviceName, instanceID string) bool {
    // Services send periodic heartbeats to renew their TTL
    // => if a service crashes, heartbeats stop and the registry evicts it after TTL
    r.mu.Lock()
    defer r.mu.Unlock()
    if svc, ok := r.instances[serviceName]; ok {
        if inst, ok := svc[instanceID]; ok {
            inst.RegisteredAt = time.Now()
            return true
        }
    }
    return false
}

func (r *ServiceRegistry) Discover(serviceName string) []*ServiceInstance {
    // Return only live instances (within TTL)
    r.mu.Lock()
    defer r.mu.Unlock()
    now := time.Now()
    svc := r.instances[serviceName]
    var live []*ServiceInstance
    var expired []string
    for id, inst := range svc {
        if now.Sub(inst.RegisteredAt) < r.ttl {
            live = append(live, inst)
        } else {
            expired = append(expired, id)
        }
    }
    // Evict expired instances in-place
    for _, id := range expired {
        delete(svc, id)
        fmt.Printf("[REGISTRY] Evicted expired instance %s/%s\n", serviceName, id)
    }
    return live
}

type ServiceClient struct {
    registry   *ServiceRegistry
    rrCounters map[string]int
    // Round-robin counter per service — simple, stateless load distribution
}

func NewServiceClient(registry *ServiceRegistry) *ServiceClient {
    return &ServiceClient{
        registry:   registry,
        rrCounters: make(map[string]int),
    }
}

func (c *ServiceClient) GetInstance(serviceName string) *ServiceInstance {
    instances := c.registry.Discover(serviceName)
    if len(instances) == 0 {
        fmt.Printf("[CLIENT] No live instances for '%s'\n", serviceName)
        return nil
    }
    // Round-robin selection across available instances
    idx := c.rrCounters[serviceName] % len(instances)
    c.rrCounters[serviceName] = idx + 1
    selected := instances[idx]
    fmt.Printf("[CLIENT] Resolved '%s' -> %s (instance %s)\n",
        serviceName, selected.Address(), selected.InstanceID)
    return selected
}

func (c *ServiceClient) Call(serviceName, endpoint string) map[string]interface{} {
    inst := c.GetInstance(serviceName)
    if inst == nil {
        return map[string]interface{}{"error": "Service unavailable", "service": serviceName}
    }
    // In real code: make HTTP request to inst.Address() + endpoint
    // => simulating success response here
    return map[string]interface{}{"status": "ok", "served_by": inst.Address(), "endpoint": endpoint}
}

func main() {
    // Setup registry with two payment service instances
    registry := NewServiceRegistry(5 * time.Second)

    // Services register on startup
    inst1 := &ServiceInstance{InstanceID: "payment-a", ServiceName: "payment-service",
        Host: "10.0.1.10", Port: 8080}
    inst2 := &ServiceInstance{InstanceID: "payment-b", ServiceName: "payment-service",
        Host: "10.0.1.11", Port: 8080}

    registry.Register(inst1)
    registry.Register(inst2)
    // => [REGISTRY] Registered payment-service/payment-a at 10.0.1.10:8080
    // => [REGISTRY] Registered payment-service/payment-b at 10.0.1.11:8080

    client := NewServiceClient(registry)

    // Client calls are round-robined across both instances
    for i := 0; i < 4; i++ {
        result := client.Call("payment-service", "/charge")
        fmt.Printf("  Call %d: %s\n", i+1, result["served_by"])
    }
    // => Call 1: 10.0.1.10:8080 (payment-a)
    // => Call 2: 10.0.1.11:8080 (payment-b)
    // => Call 3: 10.0.1.10:8080 (payment-a)  (round-robin wraps)
    // => Call 4: 10.0.1.11:8080 (payment-b)

    // Simulate instance crash: payment-a stops sending heartbeats, TTL expires
    time.Sleep(6 * time.Second) // wait for TTL=5s to expire
    result := client.Call("payment-service", "/charge")
    fmt.Printf("  After crash: %v\n", result)
    // => [REGISTRY] Evicted expired instance payment-service/payment-a
    // => [CLIENT] Resolved 'payment-service' -> 10.0.1.11:8080 (only payment-b alive)
}

Key takeaway: Service registries with TTL-based health expiry automatically remove crashed instances — clients always discover only live services without manual intervention.

Why It Matters: In container environments (Kubernetes, ECS), service IP addresses change on every restart. Hard-coding IPs in config files causes outages. Service discovery (Consul, Eureka, Kubernetes DNS) provides stable logical names that resolve to current healthy instances. Client-side discovery puts load-balancing logic in the client; server-side discovery (via a load balancer proxy) centralizes it — both patterns are in wide use.


Health Checks

Example 41: Health Check Endpoints with Dependency Probing

Health checks allow orchestrators (Kubernetes, load balancers) to route traffic only to healthy instances. A basic liveness check confirms the process is running; a readiness check confirms all dependencies (database, cache, downstream services) are reachable and the instance can serve traffic.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

type HealthStatus struct {
    Healthy   bool
    Status    string // "healthy" | "degraded" | "unhealthy"
    Checks    map[string]map[string]interface{}
    Timestamp float64
}

type HealthChecker struct {
    // Registry of named checks: each returns (bool, string)
    // => bool: check passed, string: human-readable message
    checks map[string]func() (bool, string)
}

func NewHealthChecker() *HealthChecker {
    return &HealthChecker{checks: make(map[string]func() (bool, string))}
}

func (h *HealthChecker) Register(name string, checkFn func() (bool, string)) {
    // Register a named dependency check
    // => checks are registered at startup and run on each health probe
    h.checks[name] = checkFn
}

func (h *HealthChecker) CheckLiveness() *HealthStatus {
    // Liveness: is the process alive? Almost always returns healthy.
    // => Kubernetes kills and restarts the pod if liveness fails
    // => should NOT include dependency checks — only self-checks
    return &HealthStatus{
        Healthy: true,
        Status:  "healthy",
        Checks: map[string]map[string]interface{}{
            "process": {"healthy": true, "message": "Process is running"},
        },
        Timestamp: float64(time.Now().Unix()),
    }
}

func (h *HealthChecker) CheckReadiness() *HealthStatus {
    // Readiness: can this instance serve traffic?
    // => Kubernetes removes pod from Service endpoints if readiness fails
    // => includes all dependency checks (DB, cache, downstream services)
    results := make(map[string]map[string]interface{})
    allHealthy := true

    for name, checkFn := range h.checks {
        ok, message := checkFn()
        results[name] = map[string]interface{}{"healthy": ok, "message": message}
        if !ok {
            allHealthy = false
        }
    }

    // Degraded: some checks pass, some fail — instance can partially serve
    anyHealthy := false
    for _, r := range results {
        if r["healthy"].(bool) {
            anyHealthy = true
            break
        }
    }
    status := "unhealthy"
    if allHealthy {
        status = "healthy"
    } else if anyHealthy {
        status = "degraded" // load balancer may still route to this instance
    }

    return &HealthStatus{
        Healthy:   allHealthy,
        Status:    status,
        Checks:    results,
        Timestamp: float64(time.Now().Unix()),
    }
}

func main() {
    // --- Simulated dependency checkers ---
    dbCallCount := 0

    checkDatabase := func() (bool, string) {
        dbCallCount++
        // Simulate DB timeout on 3rd check
        if dbCallCount == 3 {
            return false, "Database connection timeout after 5000ms"
        }
        latencyMs := 1.0 + rand.Float64()*19.0
        return true, fmt.Sprintf("PostgreSQL reachable (latency=%.1fms)", latencyMs)
    }

    checkRedis := func() (bool, string) {
        latencyMs := 0.5 + rand.Float64()*4.5
        return true, fmt.Sprintf("Redis PING ok (latency=%.1fms)", latencyMs)
    }

    checkDownstreamPayment := func() (bool, string) {
        // => if payment service is down, this instance is not ready to process payments
        if rand.Float64() > 0.2 { // 80% uptime simulation
            return true, "Payment gateway /health returned 200"
        }
        return false, "Payment gateway /health returned 503"
    }

    checker := NewHealthChecker()
    checker.Register("database", checkDatabase)
    checker.Register("redis", checkRedis)
    checker.Register("payment-gateway", checkDownstreamPayment)

    fmt.Println("=== Liveness Probe ===")
    liveness := checker.CheckLiveness()
    httpCode := 200
    if !liveness.Healthy {
        httpCode = 503
    }
    fmt.Printf("  Status: %s (HTTP %d)\n", liveness.Status, httpCode)
    // => Status: healthy (HTTP 200)

    fmt.Println("\n=== Readiness Probe (Run 1: all healthy) ===")
    readiness := checker.CheckReadiness()
    fmt.Printf("  Status: %s\n", readiness.Status)
    for dep, result := range readiness.Checks {
        icon := "OK"
        if !result["healthy"].(bool) {
            icon = "FAIL"
        }
        fmt.Printf("  [%s] %s: %s\n", icon, dep, result["message"])
    }

    fmt.Println("\n=== Readiness Probe (Run 2: simulate DB failure on call 3) ===")
    // Force DB check to be call #3 (fails)
    dbCallCount = 2
    readiness2 := checker.CheckReadiness()
    fmt.Printf("  Status: %s\n", readiness2.Status)
    for dep, result := range readiness2.Checks {
        icon := "OK"
        if !result["healthy"].(bool) {
            icon = "FAIL"
        }
        fmt.Printf("  [%s] %s: %s\n", icon, dep, result["message"])
    }
    // => [FAIL] database: Database connection timeout after 5000ms
    // => Status: degraded or unhealthy (DB failed)
    // => Kubernetes removes this instance from Service load balancing
}

Key takeaway: Separate liveness and readiness probes allow orchestrators to distinguish between “process is stuck” (kill it) and “dependencies are down” (stop routing traffic but don’t kill it).

Why It Matters: A pod that has lost its database connection is not liveness-failed — killing it won’t help if the database is down. But it IS readiness-failed — traffic should not route to it. Kubernetes uses /healthz for liveness and /readyz for readiness. AWS ALB target health checks use a single endpoint. Getting these probe semantics wrong causes cascading restarts or traffic to dead instances during database outages.


Microservices Communication

Example 42: Synchronous vs Asynchronous Communication

Synchronous communication (HTTP/gRPC) couples caller and callee temporally — the caller waits for a response. Asynchronous communication (message queues) decouples them — the caller publishes an event and continues. Each pattern has distinct failure modes and latency characteristics.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// --- Synchronous HTTP-style communication ---

type PaymentServiceSync struct{}

func (p *PaymentServiceSync) Charge(orderID string, amount float64) (map[string]interface{}, error) {
    // Simulate network latency + payment processing time
    processingTime := time.Duration(100+rand.Intn(400)) * time.Millisecond
    time.Sleep(processingTime)
    // Simulate occasional failures (20% failure rate)
    if rand.Float64() < 0.2 {
        return nil, fmt.Errorf("Payment gateway timeout for order %s", orderID)
    }
    return map[string]interface{}{
        "order_id": orderID, "status": "charged", "amount": amount,
        "latency_ms": processingTime.Milliseconds(),
    }, nil
}

type OrderServiceSync struct {
    payment       *PaymentServiceSync
    responseTimes []float64
}

func (o *OrderServiceSync) PlaceOrder(orderID string, amount float64) map[string]interface{} {
    start := time.Now()
    // BLOCKING: order service waits for payment before returning to user
    // => if payment is slow, user sees slow response
    // => if payment is down, order placement fails immediately
    result, err := o.payment.Charge(orderID, amount)
    o.responseTimes = append(o.responseTimes, time.Since(start).Seconds())
    if err != nil {
        // => caller must handle downstream failure directly
        return map[string]interface{}{"order_id": orderID, "status": "failed", "reason": err.Error()}
    }
    return map[string]interface{}{"order_id": orderID, "status": "confirmed", "payment": result}
}

// --- Asynchronous message-queue communication ---

type PaymentWorkerAsync struct {
    resultStore map[string]map[string]interface{}
    mu          sync.Mutex
}

func (w *PaymentWorkerAsync) Process(event map[string]interface{}) {
    orderID := event["order_id"].(string)
    // Simulate async payment processing — worker controls its own pace
    time.Sleep(time.Duration(100+rand.Intn(400)) * time.Millisecond)
    w.mu.Lock()
    defer w.mu.Unlock()
    if rand.Float64() < 0.2 {
        w.resultStore[orderID] = map[string]interface{}{"status": "payment_failed"}
    } else {
        w.resultStore[orderID] = map[string]interface{}{
            "status": "payment_completed", "amount": event["amount"],
        }
    }
}

type OrderServiceAsync struct {
    eventCh       chan map[string]interface{}
    resultStore   map[string]map[string]interface{}
    responseTimes []float64
    mu            sync.Mutex
}

func (o *OrderServiceAsync) PlaceOrder(orderID string, amount float64) map[string]interface{} {
    start := time.Now()
    // NON-BLOCKING: enqueue event and return immediately to user
    // => user gets instant acknowledgment — payment happens asynchronously
    event := map[string]interface{}{
        "order_id": orderID, "amount": amount,
        "timestamp": time.Now().Unix(),
    }
    o.eventCh <- event
    elapsed := time.Since(start).Seconds()
    o.mu.Lock()
    o.responseTimes = append(o.responseTimes, elapsed)
    o.mu.Unlock()
    // => response time is channel send latency (~microseconds), not payment latency
    return map[string]interface{}{
        "order_id": orderID, "status": "pending",
        "message": "Order accepted, payment processing in background",
    }
}

func (o *OrderServiceAsync) GetOrderStatus(orderID string) map[string]interface{} {
    // Caller polls for eventual result — or receives webhook when ready
    if result, ok := o.resultStore[orderID]; ok {
        return result
    }
    return map[string]interface{}{"status": "pending"}
}

func main() {
    fmt.Println("=== Synchronous Pattern ===")
    syncPayment := &PaymentServiceSync{}
    syncOrders := &OrderServiceSync{payment: syncPayment}

    for i := 0; i < 3; i++ {
        result := syncOrders.PlaceOrder(fmt.Sprintf("ORD-%03d", i), 99.99)
        fmt.Printf("  %s: %s\n", result["order_id"], result["status"])
    }

    avgSync := 0.0
    for _, t := range syncOrders.responseTimes {
        avgSync += t
    }
    avgSync /= float64(len(syncOrders.responseTimes))
    fmt.Printf("  Avg response time: %.0fms (includes payment latency)\n", avgSync*1000)
    // => Avg response time: ~200ms (blocks for payment processing)

    fmt.Println("\n=== Asynchronous Pattern ===")
    resultStore := make(map[string]map[string]interface{})
    eventCh := make(chan map[string]interface{}, 100)
    asyncOrders := &OrderServiceAsync{
        eventCh: eventCh, resultStore: resultStore,
    }
    worker := &PaymentWorkerAsync{resultStore: resultStore}

    // Start background worker
    go func() {
        for event := range eventCh {
            worker.Process(event)
        }
    }()

    for i := 0; i < 3; i++ {
        result := asyncOrders.PlaceOrder(fmt.Sprintf("ORD-A%03d", i), 99.99)
        fmt.Printf("  %s: %s (returned immediately)\n", result["order_id"], result["status"])
    }

    avgAsync := 0.0
    for _, t := range asyncOrders.responseTimes {
        avgAsync += t
    }
    avgAsync /= float64(len(asyncOrders.responseTimes))
    fmt.Printf("  Avg response time: %.2fms (queue enqueue only)\n", avgAsync*1000)
    // => Avg response time: ~0.10ms — 2000x faster than synchronous

    time.Sleep(1 * time.Second) // allow worker to process
    fmt.Println("\n=== Eventual results after async processing ===")
    for i := 0; i < 3; i++ {
        status := asyncOrders.GetOrderStatus(fmt.Sprintf("ORD-A%03d", i))
        fmt.Printf("  ORD-A%03d: %v\n", i, status)
    }
    // => ORD-A000: map[amount:99.99 status:payment_completed]
    // => ORD-A001: map[amount:99.99 status:payment_completed]
    // => ORD-A002: map[status:payment_failed]
}

Key takeaway: Synchronous calls are simple but couple caller availability to callee availability — async messaging decouples them at the cost of eventual consistency and more complex status tracking.

Why It Matters: E-commerce checkout that synchronously calls payment, inventory, fraud, and shipping would chain four failure points into a single user request. Async messaging (Kafka, SQS) lets each service process independently and retry without blocking users. The trade-off is that users see “pending” instead of “confirmed” — acceptable for most workflows, but not for financial transactions that require immediate confirmation.


Data Serialization

Example 43: JSON Serialization and Schema Validation

JSON is the dominant wire format for REST APIs — human-readable, language-agnostic, and widely supported. Schema validation (using JSON Schema) ensures incoming data matches expected structure before processing, preventing malformed data from corrupting business logic.

package main

import (
    "encoding/json"
    "fmt"
    "strings"
)

// SchemaValidator: JSON Schema subset validator — demonstrates key validation concepts
// Production: use a library like github.com/santhosh-tekuri/jsonschema
type SchemaValidator struct{}

func (v *SchemaValidator) Validate(data interface{}, schema map[string]interface{}) []string {
    var errors []string
    v.validateNode(data, schema, "$", &errors)
    return errors
}

func (v *SchemaValidator) validateNode(value interface{}, schema map[string]interface{}, path string, errors *[]string) {
    expectedType, _ := schema["type"].(string)
    if expectedType == "" {
        return
    }

    // Type validation
    valid := false
    switch expectedType {
    case "object":
        _, valid = value.(map[string]interface{})
    case "array":
        _, valid = value.([]interface{})
    case "string":
        _, valid = value.(string)
    case "integer":
        if f, ok := value.(float64); ok {
            valid = f == float64(int(f))
        }
    case "number":
        _, valid = value.(float64)
    case "boolean":
        _, valid = value.(bool)
    }

    if !valid {
        *errors = append(*errors, fmt.Sprintf("%s: expected %s, got %T", path, expectedType, value))
        return
    }

    switch expectedType {
    case "object":
        obj := value.(map[string]interface{})
        // Required fields check
        if required, ok := schema["required"].([]interface{}); ok {
            for _, req := range required {
                reqStr := req.(string)
                if _, exists := obj[reqStr]; !exists {
                    *errors = append(*errors, fmt.Sprintf("%s.%s: required field is missing", path, reqStr))
                }
            }
        }
        // Validate each declared property
        if props, ok := schema["properties"].(map[string]interface{}); ok {
            for prop, propSchema := range props {
                if val, exists := obj[prop]; exists {
                    v.validateNode(val, propSchema.(map[string]interface{}), path+"."+prop, errors)
                }
            }
        }
    case "string":
        s := value.(string)
        if minLen, ok := schema["minLength"].(float64); ok && float64(len(s)) < minLen {
            *errors = append(*errors, fmt.Sprintf("%s: string too short (min=%.0f, got=%d)", path, minLen, len(s)))
        }
        if maxLen, ok := schema["maxLength"].(float64); ok && float64(len(s)) > maxLen {
            *errors = append(*errors, fmt.Sprintf("%s: string too long (max=%.0f, got=%d)", path, maxLen, len(s)))
        }
    case "integer", "number":
        num := value.(float64)
        if minimum, ok := schema["minimum"].(float64); ok && num < minimum {
            *errors = append(*errors, fmt.Sprintf("%s: value %.0f < minimum %.0f", path, num, minimum))
        }
        if maximum, ok := schema["maximum"].(float64); ok && num > maximum {
            *errors = append(*errors, fmt.Sprintf("%s: value %.0f > maximum %.0f", path, num, maximum))
        }
    }
}

func main() {
    // Define schema for an order creation request
    orderSchema := map[string]interface{}{
        "type":     "object",
        "required": []interface{}{"order_id", "customer_email", "items", "total_cents"},
        "properties": map[string]interface{}{
            "order_id":       map[string]interface{}{"type": "string", "minLength": 3.0, "maxLength": 50.0},
            "customer_email": map[string]interface{}{"type": "string", "minLength": 5.0, "maxLength": 255.0},
            "items":          map[string]interface{}{"type": "array"},
            "total_cents":    map[string]interface{}{"type": "integer", "minimum": 1.0},
            "discount_pct":   map[string]interface{}{"type": "number", "minimum": 0.0, "maximum": 100.0},
        },
    }

    validator := &SchemaValidator{}

    // Test 1: valid order payload
    validJSON := `{"order_id":"ORD-001","customer_email":"alice@example.com","items":[{"sku":"WIDGET","qty":2,"price_cents":999}],"total_cents":1998,"discount_pct":10.0}`
    // Serialize to JSON wire format
    // => compact serialization: no spaces (saves bandwidth)
    fmt.Printf("Serialized size: %d bytes\n", len(validJSON))
    // => Serialized size: ~120 bytes

    // Deserialize and validate
    var parsed map[string]interface{}
    json.Unmarshal([]byte(validJSON), &parsed)
    // => parsed is a Go map identical to validOrder
    errs := validator.Validate(parsed, orderSchema)
    fmt.Printf("Valid order errors: %v\n", errs)
    // => Valid order errors: []

    // Test 2: invalid payload — missing required field, negative total
    invalidJSON := `{"order_id":"X","items":[],"total_cents":-50}`
    var parsed2 map[string]interface{}
    json.Unmarshal([]byte(invalidJSON), &parsed2)
    errs2 := validator.Validate(parsed2, orderSchema)
    fmt.Println("Invalid order errors:")
    for _, err := range errs2 {
        fmt.Printf("  - %s\n", err)
    }
    // => Invalid order errors:
    // =>   - $.order_id: string too short (min=3, got=1)
    // =>   - $.customer_email: required field is missing
    // =>   - $.total_cents: value -50 < minimum 1

    // Test 3: demonstrate JSON type coercion gotcha
    coercionJSON := `{"total_cents":"1998"}`
    var parsed3 map[string]interface{}
    json.Unmarshal([]byte(coercionJSON), &parsed3)
    errs3 := validator.Validate(parsed3, map[string]interface{}{
        "type": "object",
        "properties": map[string]interface{}{
            "total_cents": map[string]interface{}{"type": "integer"},
        },
    })
    fmt.Printf("Type coercion errors: %v\n", errs3)
    // => Type coercion errors: [$.total_cents: expected integer, got string]
    // => JSON doesn't auto-coerce — "1998" != 1998 in schema validation

    _ = strings.Contains // suppress unused import
}

Key takeaway: JSON schema validation at API boundaries rejects malformed data before it enters business logic — catching errors at ingress prevents data corruption deep in processing pipelines.

Why It Matters: Without schema validation, malformed payloads (wrong types, missing fields, out-of-range values) reach database writes where they cause constraint errors, null pointer exceptions, or silent data corruption. OpenAPI specifications use JSON Schema to define API contracts. FastAPI and Pydantic, Go’s encoding/json with struct tags, and Java’s Jackson all provide schema-aware deserialization. Validating at the boundary is the “fail fast” principle applied to data.


Example 44: Protocol Buffers Serialization

Protocol Buffers (protobuf) is a binary serialization format developed by Google. It is 3-10x smaller and 5-100x faster than JSON for equivalent data. The schema is defined in .proto files and compiled to language-specific classes — providing type safety and forward/backward compatibility via field numbering.

package main

import (
    "encoding/json"
    "fmt"
)

// Minimal protobuf encoder/decoder for demonstration
// Production: use google.golang.org/protobuf
// This implements wire types 0 (varint) and 2 (length-delimited)

func encodeVarint(value uint64) []byte {
    // Varint: uses 7 bits per byte, MSB=1 means more bytes follow
    // => small numbers (0-127) fit in 1 byte; large numbers use up to 10 bytes
    var result []byte
    for value > 0x7F {
        result = append(result, byte(value&0x7F)|0x80)
        value >>= 7
    }
    result = append(result, byte(value&0x7F))
    return result
}

func decodeVarint(buf []byte, pos int) (uint64, int) {
    var result uint64
    shift := uint(0)
    for {
        b := buf[pos]
        pos++
        result |= uint64(b&0x7F) << shift
        if b&0x80 == 0 {
            break
        }
        shift += 7
    }
    return result, pos
}

// ProtoOrder simulates generated code from:
//
//  message Order {
//    uint32 order_id   = 1;   // field number 1, wire type 0 (varint)
//    string item_sku   = 2;   // field number 2, wire type 2 (length-delimited)
//    uint32 quantity   = 3;   // field number 3, wire type 0
//    uint32 price_cents = 4;  // field number 4, wire type 0
//  }
type ProtoOrder struct {
    OrderID    uint32
    ItemSku    string
    Quantity   uint32
    PriceCents uint32
}

func (o *ProtoOrder) Serialize() []byte {
    var buf []byte
    // Each field: tag (field_number << 3 | wire_type) + value
    // Wire type 0 = varint, wire type 2 = length-delimited (bytes/string)
    for _, f := range []struct {
        fieldNum  uint64
        wireType  uint64
        value     uint64
    }{
        {1, 0, uint64(o.OrderID)},    // field 1: varint
        {3, 0, uint64(o.Quantity)},    // field 3: varint
        {4, 0, uint64(o.PriceCents)},  // field 4: varint
    } {
        tag := (f.fieldNum << 3) | f.wireType
        buf = append(buf, encodeVarint(tag)...)
        buf = append(buf, encodeVarint(f.value)...)
    }
    // String field (wire type 2): tag + length + UTF-8 bytes
    skuBytes := []byte(o.ItemSku)
    tag2 := (2 << 3) | 2 // field 2, wire type 2
    buf = append(buf, encodeVarint(uint64(tag2))...)
    buf = append(buf, encodeVarint(uint64(len(skuBytes)))...)
    buf = append(buf, skuBytes...)
    return buf
}

func DeserializeProtoOrder(data []byte) *ProtoOrder {
    fields := make(map[uint64]interface{})
    pos := 0
    for pos < len(data) {
        tag, newPos := decodeVarint(data, pos)
        pos = newPos
        fieldNum := tag >> 3
        wireType := tag & 0x07
        if wireType == 0 { // varint
            value, newPos := decodeVarint(data, pos)
            pos = newPos
            fields[fieldNum] = value
        } else if wireType == 2 { // length-delimited
            length, newPos := decodeVarint(data, pos)
            pos = newPos
            value := string(data[pos : pos+int(length)])
            pos += int(length)
            fields[fieldNum] = value
        }
    }
    order := &ProtoOrder{}
    if v, ok := fields[1].(uint64); ok {
        order.OrderID = uint32(v)
    }
    if v, ok := fields[2].(string); ok {
        order.ItemSku = v
    }
    if v, ok := fields[3].(uint64); ok {
        order.Quantity = uint32(v)
    }
    if v, ok := fields[4].(uint64); ok {
        order.PriceCents = uint32(v)
    }
    return order
}

func main() {
    order := &ProtoOrder{OrderID: 7821, ItemSku: "WIDGET-A-PREMIUM", Quantity: 5, PriceCents: 2999}

    // Serialize to protobuf binary
    protoBytes := order.Serialize()
    fmt.Printf("Protobuf size: %d bytes\n", len(protoBytes))
    // => Protobuf size: ~25 bytes

    // Equivalent JSON representation
    jsonRepr := map[string]interface{}{
        "order_id":    order.OrderID,
        "item_sku":    order.ItemSku,
        "quantity":    order.Quantity,
        "price_cents": order.PriceCents,
    }
    jsonBytes, _ := json.Marshal(jsonRepr)
    fmt.Printf("JSON size:     %d bytes\n", len(jsonBytes))
    // => JSON size: ~70 bytes
    // => Protobuf is ~2.8x smaller for this payload

    fmt.Printf("Size reduction: %.0f%%\n", (1-float64(len(protoBytes))/float64(len(jsonBytes)))*100)
    // => Size reduction: ~64%

    // Round-trip: serialize then deserialize
    decoded := DeserializeProtoOrder(protoBytes)
    fmt.Printf("Decoded: order_id=%d, sku=%s, qty=%d, price=%d\n",
        decoded.OrderID, decoded.ItemSku, decoded.Quantity, decoded.PriceCents)
    // => Decoded: order_id=7821, sku=WIDGET-A-PREMIUM, qty=5, price=2999
}

Key takeaway: Protobuf’s binary encoding and field-number schema make it 2-10x smaller than JSON and significantly faster to encode/decode — critical for high-throughput inter-service communication where JSON parsing overhead accumulates.

Why It Matters: At Google scale, 1 billion API calls/day with 10KB JSON payloads = 10TB of data. Switching to protobuf reduces this to ~2TB, cutting network costs and latency. gRPC uses protobuf as its default wire format. Kafka topics often use protobuf or Avro instead of JSON. The schema evolution story is compelling — adding a new field with a new field number is backward compatible; old consumers ignore unknown fields.


Database Indexing

Example 45: B-Tree Index vs Sequential Scan

A database index is a separate data structure that maps column values to row locations, enabling O(log N) lookups instead of O(N) full table scans. B-tree indexes are the default in PostgreSQL, MySQL, and SQLite. Understanding when indexes help (selective queries) versus when they hurt (small tables, high-write tables) is essential.

package main

import (
    "fmt"
    "math/rand"
    "sort"
    "time"
)

type IndexEntry struct {
    Value int // indexed column value (e.g., user_id)
    RowID int // pointer to actual row (e.g., heap tuple ID)
}

// BTreeIndexSimulator simulates a simplified B-tree index with sorted slice for binary search.
type BTreeIndexSimulator struct {
    // Sorted list of (value, rowID) pairs — maintained in value order
    // => real B-trees use pages of sorted key-pointer pairs for disk efficiency
    entries []IndexEntry
}

func (b *BTreeIndexSimulator) Insert(value, rowID int) {
    entry := IndexEntry{Value: value, RowID: rowID}
    // Binary search to find insertion point — maintains sorted order
    idx := sort.Search(len(b.entries), func(i int) bool {
        return b.entries[i].Value > value || (b.entries[i].Value == value && b.entries[i].RowID >= rowID)
    })
    b.entries = append(b.entries, IndexEntry{})
    copy(b.entries[idx+1:], b.entries[idx:])
    b.entries[idx] = entry
}

func (b *BTreeIndexSimulator) PointLookup(value int) []int {
    // Find all rowIDs where indexed column == value. O(log N).
    // Binary search to find leftmost entry with this value
    lo := sort.Search(len(b.entries), func(i int) bool {
        return b.entries[i].Value >= value
    })
    var rowIDs []int
    for lo < len(b.entries) && b.entries[lo].Value == value {
        rowIDs = append(rowIDs, b.entries[lo].RowID)
        lo++
    }
    return rowIDs
    // => O(log N) to find start position + O(k) to collect k matching entries
}

func (b *BTreeIndexSimulator) RangeLookup(low, high int) []int {
    // Find all rowIDs where low <= value <= high. O(log N + k).
    lo := sort.Search(len(b.entries), func(i int) bool {
        return b.entries[i].Value >= low
    })
    var rowIDs []int
    for lo < len(b.entries) && b.entries[lo].Value <= high {
        rowIDs = append(rowIDs, b.entries[lo].RowID)
        lo++
    }
    return rowIDs
    // => B-tree range scans are efficient because entries are sorted
}

type TableSimulator struct {
    rows  []map[string]interface{}
    index *BTreeIndexSimulator
}

func NewTableSimulator(rows []map[string]interface{}) *TableSimulator {
    t := &TableSimulator{rows: rows, index: &BTreeIndexSimulator{}}
    // Build B-tree index on 'user_id' column
    for rowID, row := range rows {
        t.index.Insert(row["user_id"].(int), rowID)
    }
    return t
}

func (t *TableSimulator) SequentialScan(userID int) []map[string]interface{} {
    // Full table scan: O(N) — reads every row to find matches.
    var results []map[string]interface{}
    for _, row := range t.rows {
        if row["user_id"].(int) == userID {
            results = append(results, row)
        }
    }
    return results
}

func (t *TableSimulator) IndexScan(userID int) []map[string]interface{} {
    // Index + heap fetch: O(log N + k) — reads only matching rows.
    rowIDs := t.index.PointLookup(userID)
    // => O(log N) to find rowIDs in index
    var results []map[string]interface{}
    for _, rid := range rowIDs {
        results = append(results, t.rows[rid])
    }
    // => O(k) heap fetches, where k = number of matching rows
    return results
}

func (t *TableSimulator) IndexRangeScan(low, high int) []map[string]interface{} {
    // Range query using index: efficiently finds rows in [low, high].
    rowIDs := t.index.RangeLookup(low, high)
    var results []map[string]interface{}
    for _, rid := range rowIDs {
        results = append(results, t.rows[rid])
    }
    return results
}

func main() {
    // Generate 100,000 rows simulating an orders table
    rand.Seed(42)
    numRows := 100000
    rows := make([]map[string]interface{}, numRows)
    statuses := []string{"pending", "complete", "failed"}
    for i := 0; i < numRows; i++ {
        rows[i] = map[string]interface{}{
            "row_id":  i,
            "user_id": rand.Intn(10000) + 1, // 10k distinct users
            "amount":  10.0 + rand.Float64()*490.0,
            "status":  statuses[rand.Intn(3)],
        }
    }
    table := NewTableSimulator(rows)

    targetUser := 5000

    // Sequential scan: O(N)
    t0 := time.Now()
    seqResult := table.SequentialScan(targetUser)
    seqTime := time.Since(t0)

    // Index scan: O(log N + k)
    t0 = time.Now()
    idxResult := table.IndexScan(targetUser)
    idxTime := time.Since(t0)

    fmt.Printf("Rows in table: %d\n", numRows)
    fmt.Printf("Matching rows for user_id=%d: %d\n", targetUser, len(seqResult))
    fmt.Printf("Sequential scan:  %.2fms\n", float64(seqTime.Microseconds())/1000)
    fmt.Printf("Index scan:       %.4fms\n", float64(idxTime.Microseconds())/1000)
    if idxTime > 0 {
        fmt.Printf("Speedup: %.0fx\n", float64(seqTime)/float64(idxTime))
    }
    // => Sequential scan:  ~15ms
    // => Index scan:       ~0.05ms
    // => Speedup: ~300x for 100k rows

    // Range query: find users 4000-4010
    rangeResult := table.IndexRangeScan(4000, 4010)
    fmt.Printf("\nRange [4000,4010]: %d rows found via index\n", len(rangeResult))
    // => efficiently returns only rows in that user_id range

    _ = idxResult // suppress unused
}

Key takeaway: B-tree indexes reduce point queries from O(N) sequential scans to O(log N) binary searches — providing 100-1000x speedup for selective queries on large tables.

Why It Matters: A query without an index on a million-row table does a full table scan — reading every page from disk. At 8KB per page and 10 rows per page, that’s 800MB of I/O. The same query with an index reads the B-tree root, a few index pages, and only the matching heap pages — often under 1MB. This is why database performance tuning starts with EXPLAIN ANALYZE to identify missing indexes.


Connection Pooling

Example 46: Database Connection Pool

Opening a new database connection takes 20-200ms (TCP handshake, TLS, authentication, session setup). A connection pool maintains a set of pre-established connections that requests borrow and return. This reduces per-request overhead to microseconds and bounds the maximum load on the database server.

package main

import (
    "fmt"
    "sync"
    "time"
)

type DBConnection struct {
    ConnID          int
    Host            string
    InUse           bool
    CreatedAt       time.Time
    QueriesExecuted int
}

func (c *DBConnection) Execute(sql string) map[string]interface{} {
    // Simulate query execution latency
    time.Sleep(5 * time.Millisecond) // 5ms query time
    c.QueriesExecuted++
    return map[string]interface{}{
        "rows":  []map[string]string{{"id": "1", "result": "ok"}},
        "query": sql,
    }
}

type ConnectionPool struct {
    host        string
    minSize     int
    maxSize     int
    idle        chan *DBConnection
    allConns    []*DBConnection
    mu          sync.Mutex
    connCounter int
}

func NewConnectionPool(host string, minSize, maxSize int) *ConnectionPool {
    p := &ConnectionPool{
        host:    host,
        minSize: minSize,
        maxSize: maxSize,
        idle:    make(chan *DBConnection, maxSize),
    }
    // Pre-warm: create minSize connections at pool startup
    // => avoids cold start latency on first requests
    for i := 0; i < minSize; i++ {
        conn := p.createConnection()
        p.idle <- conn
    }
    return p
}

func (p *ConnectionPool) createConnection() *DBConnection {
    // Opening a real connection: TCP handshake + TLS + DB auth = 20-200ms
    // => simulated as 20ms here
    time.Sleep(20 * time.Millisecond)
    p.mu.Lock()
    p.connCounter++
    conn := &DBConnection{
        ConnID:    p.connCounter,
        Host:      p.host,
        CreatedAt: time.Now(),
    }
    p.allConns = append(p.allConns, conn)
    total := len(p.allConns)
    p.mu.Unlock()
    fmt.Printf("[POOL] Created connection #%d (total=%d)\n", conn.ConnID, total)
    return conn
}

func (p *ConnectionPool) Acquire(timeout time.Duration) (*DBConnection, error) {
    // Try to get an idle connection first
    select {
    case conn := <-p.idle:
        // => got idle connection immediately — no creation overhead
        conn.InUse = true
        return conn, nil
    default:
    }

    // No idle connections: create a new one if under maxSize
    p.mu.Lock()
    underLimit := len(p.allConns) < p.maxSize
    p.mu.Unlock()

    if underLimit {
        conn := p.createConnection()
        conn.InUse = true
        return conn, nil
    }

    // At max: wait for a connection to be returned
    // => prevents overwhelming the database with too many connections
    fmt.Println("[POOL] Pool exhausted — waiting for available connection")
    select {
    case conn := <-p.idle:
        conn.InUse = true
        return conn, nil
    case <-time.After(timeout):
        return nil, fmt.Errorf("no connection available within %s", timeout)
    }
}

func (p *ConnectionPool) Release(conn *DBConnection) {
    // Always return connection to pool
    conn.InUse = false
    p.idle <- conn
    // => connection returned to idle channel for next Acquire()
}

func (p *ConnectionPool) Stats() map[string]int {
    idleCount := len(p.idle)
    p.mu.Lock()
    total := len(p.allConns)
    p.mu.Unlock()
    return map[string]int{
        "total_connections": total,
        "idle":              idleCount,
        "in_use":            total - idleCount,
        "max_size":          p.maxSize,
    }
}

func main() {
    // Simulate 5 concurrent requests hitting the pool
    pool := NewConnectionPool("db.internal:5432", 2, 5)
    // => [POOL] Created connection #1 (total=1)
    // => [POOL] Created connection #2 (total=2)

    type queryResult struct {
        queryNum int
        connID   int
        result   string
    }
    var results []queryResult
    var resultsMu sync.Mutex

    runQuery := func(queryNum int, wg *sync.WaitGroup) {
        defer wg.Done()
        conn, err := pool.Acquire(5 * time.Second)
        if err != nil {
            fmt.Printf("  Query %d: error: %s\n", queryNum, err)
            return
        }
        defer pool.Release(conn)
        // Inside: conn is exclusively ours until we Release
        result := conn.Execute(fmt.Sprintf("SELECT * FROM orders WHERE id = %d", queryNum))
        rows := result["rows"].([]map[string]string)
        resultsMu.Lock()
        results = append(results, queryResult{queryNum, conn.ConnID, rows[0]["result"]})
        resultsMu.Unlock()
    }

    // Launch 5 concurrent goroutines — pool starts with 2, will grow to handle demand
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go runQuery(i, &wg)
    }
    wg.Wait()

    fmt.Println("\nQuery results:")
    for _, r := range results {
        fmt.Printf("  Query %d: served by conn #%d -> %s\n", r.queryNum, r.connID, r.result)
    }

    fmt.Printf("\nPool stats: %v\n", pool.Stats())
    // => Pool stats: map[idle:5 in_use:0 max_size:5 total_connections:5]
    // => Pool grew from 2 to 5 connections to handle concurrent demand
    // => All connections returned to idle after queries complete
}

Key takeaway: Connection pooling amortizes the 20-200ms connection setup cost across many requests — a 5-connection pool serves thousands of requests per second by reusing the same connections.

Why It Matters: Without pooling, each HTTP request that touches the database opens a connection (20ms overhead) and closes it — making 50-requests/second throughput require 1000ms of connection overhead per second. PostgreSQL’s max_connections defaults to 100 — without pooling, 100 concurrent Node.js workers exhaust the server. PgBouncer, HikariCP (Java), and SQLAlchemy’s pool handle this in production.


Session Management

Example 47: Distributed Session with Token-Based Authentication

Stateful sessions store user data on the server (memory or Redis) keyed by a session ID. Stateless JWTs embed user data in a signed token that any server can verify without shared state. The trade-off is revocability: sessions can be invalidated instantly; JWTs cannot until they expire.

package main

import (
    "crypto/hmac"
    "crypto/rand"
    "crypto/sha256"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "strings"
    "time"
)

// --- Server-Side Session Store ---

type SessionStore struct {
    sessions map[string]map[string]interface{}
    ttl      int
}

func NewSessionStore(ttlSeconds int) *SessionStore {
    return &SessionStore{
        sessions: make(map[string]map[string]interface{}),
        ttl:      ttlSeconds,
    }
}

func (s *SessionStore) Create(userID string, data map[string]interface{}) string {
    // Generate cryptographically random session ID — 32 bytes = 256 bits of entropy
    // => impossible to guess; not derived from user data
    b := make([]byte, 32)
    rand.Read(b)
    sessionID := base64.URLEncoding.EncodeToString(b)
    if data == nil {
        data = map[string]interface{}{}
    }
    s.sessions[sessionID] = map[string]interface{}{
        "user_id":    userID,
        "data":       data,
        "created_at": float64(time.Now().Unix()),
        "expires_at": float64(time.Now().Unix()) + float64(s.ttl),
    }
    return sessionID
    // => session ID is returned to client as httponly cookie
}

func (s *SessionStore) Get(sessionID string) map[string]interface{} {
    session, ok := s.sessions[sessionID]
    if !ok {
        return nil // session not found
    }
    if float64(time.Now().Unix()) > session["expires_at"].(float64) {
        // Expired: delete from store and return nil
        delete(s.sessions, sessionID)
        return nil
    }
    return session
}

func (s *SessionStore) Invalidate(sessionID string) bool {
    // Immediate invalidation: server deletes session -> client's cookie becomes useless
    // => this is the key advantage over JWTs: instant revocation
    if _, ok := s.sessions[sessionID]; ok {
        delete(s.sessions, sessionID)
        return true
    }
    return false
}

// --- Stateless JWT (simplified, HMAC-SHA256) ---

type JWTService struct {
    secret []byte
    expiry int
}

func NewJWTService(secret string, expirySeconds int) *JWTService {
    return &JWTService{secret: []byte(secret), expiry: expirySeconds}
}

func (j *JWTService) base64url(data []byte) string {
    return strings.TrimRight(base64.URLEncoding.EncodeToString(data), "=")
}

func (j *JWTService) sign(data string) string {
    mac := hmac.New(sha256.New, j.secret)
    mac.Write([]byte(data))
    return j.base64url(mac.Sum(nil))
}

func (j *JWTService) CreateToken(userID string, roles []string) string {
    headerJSON, _ := json.Marshal(map[string]string{"alg": "HS256", "typ": "JWT"})
    header := j.base64url(headerJSON)
    payloadJSON, _ := json.Marshal(map[string]interface{}{
        "sub":   userID,
        "roles": roles,
        "iat":   time.Now().Unix(),
        "exp":   time.Now().Unix() + int64(j.expiry),
    })
    payload := j.base64url(payloadJSON)
    signature := j.sign(header + "." + payload)
    token := header + "." + payload + "." + signature
    // => token is self-contained: any server can verify it with the secret
    // => no database lookup needed for validation
    return token
}

func (j *JWTService) VerifyToken(token string) map[string]interface{} {
    parts := strings.Split(token, ".")
    if len(parts) != 3 {
        return nil // malformed token
    }
    header, payload, signature := parts[0], parts[1], parts[2]
    // Recompute signature and compare — prevents tampering
    expectedSig := j.sign(header + "." + payload)
    if !hmac.Equal([]byte(signature), []byte(expectedSig)) {
        return nil // signature invalid — token was tampered with or wrong secret
    }
    // Decode claims
    padded := payload
    if m := len(padded) % 4; m != 0 {
        padded += strings.Repeat("=", 4-m)
    }
    decoded, err := base64.URLEncoding.DecodeString(padded)
    if err != nil {
        return nil
    }
    var claims map[string]interface{}
    json.Unmarshal(decoded, &claims)
    if float64(time.Now().Unix()) > claims["exp"].(float64) {
        return nil // token expired — client must re-authenticate
    }
    return claims
}

func main() {
    // --- Comparison ---

    fmt.Println("=== Server-Side Session ===")
    store := NewSessionStore(30)

    // Login: create session
    sessionID := store.Create("user-alice", map[string]interface{}{
        "cart_items": 3, "preferred_lang": "en",
    })
    fmt.Printf("Session created: %s...\n", sessionID[:20])
    // => sessionID is random, e.g., "zxK9mP2wLvQ3..."

    // Subsequent request: validate session
    session := store.Get(sessionID)
    data := session["data"].(map[string]interface{})
    fmt.Printf("Session user: %s, cart: %v\n", session["user_id"], data["cart_items"])
    // => Session user: user-alice, cart: 3

    // Logout: instant invalidation — session cookie is useless immediately
    store.Invalidate(sessionID)
    sessionAfter := store.Get(sessionID)
    fmt.Printf("After invalidation: %v\n", sessionAfter)
    // => After invalidation: <nil> (server no longer recognizes the session ID)

    fmt.Println("\n=== JWT (Stateless) ===")
    jwt := NewJWTService("super-secret-key-256-bits-minimum", 30)

    // Login: create JWT
    token := jwt.CreateToken("user-bob", []string{"user", "admin"})
    fmt.Printf("JWT created: %s...\n", token[:50])
    // => JWT is three base64url segments joined by dots

    // Subsequent request: verify JWT — no database lookup
    claims := jwt.VerifyToken(token)
    fmt.Printf("JWT claims: sub=%s, roles=%v\n", claims["sub"], claims["roles"])
    // => JWT claims: sub=user-bob, roles=[user admin]

    // Tampered JWT: change one character
    tampered := token[:len(token)-1]
    if token[len(token)-1] != 'X' {
        tampered += "X"
    } else {
        tampered += "Y"
    }
    result := jwt.VerifyToken(tampered)
    fmt.Printf("Tampered JWT: %v\n", result)
    // => Tampered JWT: <nil> (HMAC mismatch detected)

    // JWT revocation problem: cannot invalidate before expiry
    // => to "logout" a JWT user, must maintain a blocklist (defeats stateless advantage)
    // => or use very short expiry (5-15 minutes) with refresh token rotation
    fmt.Println("\nJWT revocation: no built-in way to invalidate before exp")
    fmt.Println("Solution: short expiry (15min) + refresh token stored server-side")
}

Key takeaway: Sessions provide instant revocability at the cost of server-side state; JWTs are stateless and scalable but cannot be revoked until expiry — choose based on whether immediate logout is a security requirement.

Why It Matters: Stateless JWTs enable horizontal scaling without sticky sessions or shared session stores — any server can validate any token. This is why JWTs dominate microservices auth (OAuth 2.0 / OIDC). However, when a user’s account is compromised or terminated, JWTs remain valid until expiry. Hybrid approaches (short JWT expiry + server-side refresh token rotation) balance scalability with revocability.


Notification Systems

Example 48: Multi-Channel Notification Dispatcher

A notification system routes messages to users across multiple channels (email, SMS, push) based on user preferences and message priority. A fan-out dispatcher sends to all applicable channels, while a fallback dispatcher tries channels in priority order until one succeeds.

    graph TD
    Event["Notification Event<br/>type=order_shipped"]:::blue
    Dispatcher["Dispatcher<br/>Routes by preference"]:::orange
    Email["Email Channel<br/>alice@example.com"]:::teal
    SMS["SMS Channel<br/>+1-555-0100"]:::teal
    Push["Push Channel<br/>device_token_xyz"]:::teal

    Event --> Dispatcher
    Dispatcher -->|preference: email| Email
    Dispatcher -->|preference: sms| SMS
    Dispatcher -->|preference: push| Push

    classDef blue fill:#0173B2,stroke:#000,color:#fff
    classDef orange fill:#DE8F05,stroke:#000,color:#fff
    classDef teal fill:#029E73,stroke:#000,color:#fff
  
package main

import (
    "fmt"
    "strings"
)

type Channel int

const (
    ChannelEmail Channel = iota
    ChannelSMS
    ChannelPush
)

func (c Channel) String() string {
    switch c {
    case ChannelEmail:
        return "email"
    case ChannelSMS:
        return "sms"
    case ChannelPush:
        return "push"
    }
    return "unknown"
}

type Priority int

const (
    PriorityLow      Priority = iota // marketing, newsletters — fanout acceptable
    PriorityHigh                      // order confirmations — at least one channel must succeed
    PriorityCritical                  // security alerts — all available channels
)

type NotificationEvent struct {
    EventType string
    Title     string
    Body      string
    Priority  Priority
    Metadata  map[string]string
}

type UserPreferences struct {
    UserID    string
    Channels  []Channel // Ordered preference list: first = preferred channel
    Email     string
    Phone     string
    PushToken string
}

// Channel senders — return (success, message)
type EmailSender struct{}

func (e *EmailSender) Send(to, title, body string) (bool, string) {
    if to == "" || !strings.Contains(to, "@") {
        return false, fmt.Sprintf("Invalid email: %s", to)
    }
    fmt.Printf("  [EMAIL] -> %s: %s\n", to, title)
    return true, fmt.Sprintf("Email delivered to %s", to)
}

type SMSSender struct{}

func (s *SMSSender) Send(phone, body string) (bool, string) {
    if phone == "" || !strings.HasPrefix(phone, "+") {
        return false, fmt.Sprintf("Invalid phone: %s", phone)
    }
    // SMS body truncated to 160 chars (single SMS segment)
    truncated := body
    if len(truncated) > 160 {
        truncated = truncated[:160]
    }
    fmt.Printf("  [SMS]   -> %s: %s\n", phone, truncated)
    return true, fmt.Sprintf("SMS delivered to %s", phone)
}

type PushSender struct{}

func (p *PushSender) Send(token, title, body string) (bool, string) {
    if token == "" {
        return false, "No push token"
    }
    display := token
    if len(display) > 16 {
        display = display[:16] + "..."
    }
    fmt.Printf("  [PUSH]  -> %s: %s\n", display, title)
    return true, fmt.Sprintf("Push delivered to %s", display)
}

type NotificationDispatcher struct {
    email *EmailSender
    sms   *SMSSender
    push  *PushSender
}

func NewNotificationDispatcher() *NotificationDispatcher {
    return &NotificationDispatcher{
        email: &EmailSender{},
        sms:   &SMSSender{},
        push:  &PushSender{},
    }
}

func (d *NotificationDispatcher) sendViaChannel(ch Channel, event *NotificationEvent, prefs *UserPreferences) (bool, string) {
    switch ch {
    case ChannelEmail:
        if prefs.Email != "" {
            return d.email.Send(prefs.Email, event.Title, event.Body)
        }
    case ChannelSMS:
        if prefs.Phone != "" {
            return d.sms.Send(prefs.Phone, event.Body)
        }
    case ChannelPush:
        if prefs.PushToken != "" {
            return d.push.Send(prefs.PushToken, event.Title, event.Body)
        }
    }
    return false, fmt.Sprintf("Channel %s not configured for user %s", ch, prefs.UserID)
}

func (d *NotificationDispatcher) DispatchFanout(event *NotificationEvent, prefs *UserPreferences) map[string]interface{} {
    // Send to ALL configured channels — for CRITICAL alerts.
    results := map[string]interface{}{}
    sent := 0
    for _, ch := range prefs.Channels {
        ok, msg := d.sendViaChannel(ch, event, prefs)
        results[ch.String()] = map[string]interface{}{"success": ok, "message": msg}
        if ok {
            sent++
        }
    }
    return map[string]interface{}{"strategy": "fanout", "sent": sent, "results": results}
}

func (d *NotificationDispatcher) DispatchFallback(event *NotificationEvent, prefs *UserPreferences) map[string]interface{} {
    // Try channels in preference order; stop at first success — for HIGH priority.
    for _, ch := range prefs.Channels {
        ok, msg := d.sendViaChannel(ch, event, prefs)
        if ok {
            return map[string]interface{}{
                "strategy": "fallback", "sent": 1, "channel": ch.String(), "message": msg,
            }
        }
    }
    return map[string]interface{}{"strategy": "fallback", "sent": 0, "message": "All channels failed"}
}

func (d *NotificationDispatcher) Dispatch(event *NotificationEvent, prefs *UserPreferences) map[string]interface{} {
    switch event.Priority {
    case PriorityCritical:
        return d.DispatchFanout(event, prefs)
    case PriorityHigh:
        return d.DispatchFallback(event, prefs)
    default:
        // LOW priority: use only preferred channel (first in list)
        if len(prefs.Channels) > 0 {
            ch := prefs.Channels[0]
            ok, msg := d.sendViaChannel(ch, event, prefs)
            sent := 0
            if ok {
                sent = 1
            }
            return map[string]interface{}{"strategy": "preferred_only", "sent": sent, "message": msg}
        }
        return map[string]interface{}{"strategy": "preferred_only", "sent": 0, "message": "No channels"}
    }
}

func main() {
    dispatcher := NewNotificationDispatcher()
    user := &UserPreferences{
        UserID:    "user-alice",
        Channels:  []Channel{ChannelPush, ChannelEmail, ChannelSMS},
        Email:     "alice@example.com",
        Phone:     "+15550100",
        PushToken: "device_token_abc123xyz",
    }

    fmt.Println("=== CRITICAL: Security Alert (fanout - all channels) ===")
    securityEvent := &NotificationEvent{
        EventType: "security_alert",
        Title:     "New login from unknown device",
        Body:      "We detected a login from IP 192.168.1.1 in New York",
        Priority:  PriorityCritical,
        Metadata:  map[string]string{"ip": "192.168.1.1", "location": "New York"},
    }
    result := dispatcher.Dispatch(securityEvent, user)
    fmt.Printf("  Sent to %v channels via %s\n", result["sent"], result["strategy"])
    // => [PUSH] -> device_token_abc1...: New login from unknown device
    // => [EMAIL] -> alice@example.com: New login from unknown device
    // => [SMS]   -> +15550100: We detected a login...
    // => Sent to 3 channels via fanout

    fmt.Println("\n=== HIGH: Order Shipped (fallback - first success) ===")
    orderEvent := &NotificationEvent{
        EventType: "order_shipped",
        Title:     "Your order has shipped!",
        Body:      "Order ORD-001 shipped via FedEx, tracking: 123456789",
        Priority:  PriorityHigh,
        Metadata:  map[string]string{"order_id": "ORD-001"},
    }
    result = dispatcher.Dispatch(orderEvent, user)
    ch, _ := result["channel"].(string)
    fmt.Printf("  Sent via %s (strategy=%s)\n", ch, result["strategy"])
    // => [PUSH] -> device_token_abc1...: Your order has shipped!
    // => Sent via push (strategy=fallback) — stopped after first success
}

Key takeaway: Fan-out sends to all channels for critical alerts; fallback tries channels in preference order for routine notifications — matching delivery guarantee to business importance.

Why It Matters: Notification systems underpin user engagement across every digital product. Systems like Twilio, Firebase Cloud Messaging, and SendGrid expose channel-specific APIs. At scale, notification dispatchers use queues (SQS/Kafka) to decouple high-volume event ingestion from channel-specific delivery — a single “order.shipped” event fanout to millions of users cannot happen synchronously. Priority routing also manages cost: push is near-free, SMS costs $0.01/message.


Search Systems

Example 49: Inverted Index for Full-Text Search

A search engine’s core data structure is an inverted index: a mapping from each word to the list of documents containing it. Given a query, the search engine finds the intersection of posting lists for query terms and ranks results by relevance (TF-IDF). This is the foundation of Elasticsearch, Solr, and Lucene.

package main

import (
    "fmt"
    "math"
    "regexp"
    "sort"
    "strings"
)

type Document struct {
    DocID int
    Title string
    Body  string
}

func (d *Document) Text() string {
    return d.Title + " " + d.Body
}

type PostingEntry struct {
    DocID     int
    TermFreq  int   // how many times this term appears in the document
    Positions []int // positions where term appears (for phrase queries)
}

type InvertedIndex struct {
    index      map[string][]PostingEntry
    documents  map[int]*Document
    docLengths map[int]int
    docCount   int
}

func NewInvertedIndex() *InvertedIndex {
    return &InvertedIndex{
        index:      make(map[string][]PostingEntry),
        documents:  make(map[int]*Document),
        docLengths: make(map[int]int),
    }
}

var tokenRegex = regexp.MustCompile(`[a-z0-9]+`)

func (idx *InvertedIndex) tokenize(text string) []string {
    // Lowercase, split on non-alphanumeric, filter short tokens
    matches := tokenRegex.FindAllString(strings.ToLower(text), -1)
    // => removes punctuation, normalizes case
    // Production: also apply stemming (run/running -> run) and stop-word removal
    var tokens []string
    for _, t := range matches {
        if len(t) > 2 {
            tokens = append(tokens, t)
        }
    }
    return tokens
}

func (idx *InvertedIndex) AddDocument(doc *Document) {
    idx.documents[doc.DocID] = doc
    idx.docCount++
    tokens := idx.tokenize(doc.Text())
    idx.docLengths[doc.DocID] = len(tokens)

    // Count term frequencies and positions
    termPositions := make(map[string][]int)
    for pos, token := range tokens {
        termPositions[token] = append(termPositions[token], pos)
    }
    // Add to inverted index
    for term, positions := range termPositions {
        entry := PostingEntry{DocID: doc.DocID, TermFreq: len(positions), Positions: positions}
        idx.index[term] = append(idx.index[term], entry)
    }
}

func (idx *InvertedIndex) idf(term string) float64 {
    // IDF (Inverse Document Frequency): rare terms are more discriminative
    // => IDF = log(N / df), where N=total docs, df=docs containing term
    df := len(idx.index[term])
    if df == 0 {
        return 0.0
    }
    return math.Log(float64(idx.docCount) / float64(df))
}

func (idx *InvertedIndex) tfIdf(term string, docID, termFreq int) float64 {
    // TF = termFreq / docLength (normalized)
    docLen := idx.docLengths[docID]
    if docLen == 0 {
        docLen = 1
    }
    tf := float64(termFreq) / float64(docLen)
    // TF-IDF = TF * IDF: high for terms that appear often in this doc but rarely elsewhere
    return tf * idx.idf(term)
}

func (idx *InvertedIndex) Search(query string, topK int) []map[string]interface{} {
    queryTerms := idx.tokenize(query)
    if len(queryTerms) == 0 {
        return nil
    }

    // Gather candidate docIDs: union of all posting lists
    candidateScores := make(map[int]float64)
    for _, term := range queryTerms {
        for _, entry := range idx.index[term] {
            // Accumulate TF-IDF scores across all query terms
            score := idx.tfIdf(term, entry.DocID, entry.TermFreq)
            candidateScores[entry.DocID] += score
            // => documents matching more query terms get higher cumulative scores
        }
    }

    // Sort by score descending, return topK
    type scored struct {
        docID int
        score float64
    }
    var ranked []scored
    for id, s := range candidateScores {
        ranked = append(ranked, scored{id, s})
    }
    sort.Slice(ranked, func(i, j int) bool {
        return ranked[i].score > ranked[j].score
    })

    var results []map[string]interface{}
    for i, r := range ranked {
        if i >= topK {
            break
        }
        doc := idx.documents[r.docID]
        results = append(results, map[string]interface{}{
            "doc_id": r.docID,
            "title":  doc.Title,
            "score":  math.Round(r.score*10000) / 10000,
        })
    }
    return results
}

func main() {
    // Build a small document corpus
    docs := []*Document{
        {1, "Introduction to Python", "Python is a high-level programming language used for web development machine learning and automation"},
        {2, "Python Machine Learning", "Machine learning with Python using scikit-learn tensorflow and neural networks for data science"},
        {3, "Web Development with Django", "Django is a Python web framework for building scalable web applications with database ORM routing"},
        {4, "Database Design Principles", "Relational database design normalization indexing transactions and query optimization with SQL"},
        {5, "System Design Interviews", "System design patterns scalability load balancing caching database sharding microservices"},
    }

    index := NewInvertedIndex()
    for _, doc := range docs {
        index.AddDocument(doc)
    }

    fmt.Println("=== Search: 'python machine learning' ===")
    results := index.Search("python machine learning", 3)
    for _, r := range results {
        fmt.Printf("  [%.4f] %s\n", r["score"], r["title"])
    }
    // => [0.xxxx] Python Machine Learning   (highest: has both terms densely)
    // => [0.xxxx] Introduction to Python    (has 'python' but not 'machine learning')
    // => [0.xxxx] Web Development with Django (has 'python' in body)

    fmt.Println("\n=== Search: 'database indexing' ===")
    results2 := index.Search("database indexing", 3)
    for _, r := range results2 {
        fmt.Printf("  [%.4f] %s\n", r["score"], r["title"])
    }
    // => [0.xxxx] Database Design Principles (has both terms)
    // => [0.xxxx] System Design Interviews  (has 'database' in context)
}

Key takeaway: The inverted index maps terms to document posting lists, enabling O(1) term lookup and O(k) result collection — making full-text search over millions of documents feasible with TF-IDF ranking.

Why It Matters: Without an inverted index, a search query scans every document’s text — O(N*M) where N=documents, M=doc length. With an inverted index, each query term is a direct lookup into a pre-built posting list. Elasticsearch (built on Lucene) extends this with distributed inverted indexes, BM25 ranking, fuzzy matching, and aggregations. Every search box in a production application uses this fundamental data structure.


Blob Storage

Example 50: Object Storage with Metadata Indexing

Blob (Binary Large Object) storage systems store files as opaque byte sequences identified by keys. Unlike filesystems, object storage provides flat namespaces with rich metadata, versioning, and HTTP access. Internally, objects are stored on distributed storage nodes with metadata (size, content-type, checksums) tracked in a separate database.

package main

import (
    "crypto/md5"
    "crypto/sha256"
    "fmt"
    "sort"
    "time"

)


type ObjectMetadata struct {
    Key            string
    Bucket         string
    SizeBytes      int
    ContentType    string
    ETag           string
    VersionID      string
    CreatedAt      float64
    LastModified   float64
    CustomMetadata map[string]string
}

type StoredObject struct {
    Metadata *ObjectMetadata
    Content  []byte
}

type ObjectStore struct {
    contentStore  map[string][]byte                     // versionID -> bytes
    metadataIndex map[string][]*ObjectMetadata          // "bucket/key" -> versions (newest first)
}

func NewObjectStore() *ObjectStore {
    return &ObjectStore{
        contentStore:  make(map[string][]byte),
        metadataIndex: make(map[string][]*ObjectMetadata),
    }
}

func (s *ObjectStore) Put(bucket, key string, content []byte, contentType string, metadata map[string]string) *ObjectMetadata {
    // Compute ETag (content hash) for integrity verification
    // => clients can verify they received the exact bytes they uploaded
    hash := md5.Sum(content)
    etag := fmt.Sprintf("%x", hash)
    versionID := fmt.Sprintf("%d", time.Now().UnixNano())
    now := float64(time.Now().Unix())
    if metadata == nil {
        metadata = map[string]string{}
    }
    meta := &ObjectMetadata{
        Key: key, Bucket: bucket, SizeBytes: len(content),
        ContentType: contentType, ETag: etag, VersionID: versionID,
        CreatedAt: now, LastModified: now, CustomMetadata: metadata,
    }
    // Store content indexed by versionID (not key) to support versioning
    // => old version content is still accessible by old versionID
    s.contentStore[versionID] = content
    indexKey := bucket + "/" + key
    // Prepend new version — latest is always [0]
    s.metadataIndex[indexKey] = append([]*ObjectMetadata{meta}, s.metadataIndex[indexKey]...)
    fmt.Printf("[STORE] PUT s3://%s/%s (%d bytes, etag=%s..., v=%s...)\n",
        bucket, key, len(content), etag[:8], versionID[:8])
    return meta
}

func (s *ObjectStore) Get(bucket, key, versionID string) *StoredObject {
    indexKey := bucket + "/" + key
    versions := s.metadataIndex[indexKey]
    if len(versions) == 0 {
        return nil
    }
    var meta *ObjectMetadata
    if versionID != "" {
        for _, v := range versions {
            if v.VersionID == versionID {
                meta = v
                break
            }
        }
    } else {
        meta = versions[0] // latest version
    }
    if meta == nil {
        return nil
    }
    content := s.contentStore[meta.VersionID]
    return &StoredObject{Metadata: meta, Content: content}
}

func (s *ObjectStore) Delete(bucket, key string) bool {
    indexKey := bucket + "/" + key
    if _, ok := s.metadataIndex[indexKey]; !ok {
        return false
    }
    delete(s.metadataIndex, indexKey)
    fmt.Printf("[STORE] DELETE s3://%s/%s\n", bucket, key)
    return true
}

func (s *ObjectStore) ListObjects(bucket, prefix string) []*ObjectMetadata {
    var results []*ObjectMetadata
    for indexKey, versions := range s.metadataIndex {
        expected := bucket + "/" + prefix
        if len(versions) > 0 && len(indexKey) >= len(bucket)+1 &&
            indexKey[:len(bucket)+1] == bucket+"/" &&
            (prefix == "" || len(indexKey) >= len(expected) && indexKey[:len(expected)] == expected) {
            results = append(results, versions[0])
        }
    }
    sort.Slice(results, func(i, j int) bool {
        return results[i].Key < results[j].Key
    })
    return results
}

func (s *ObjectStore) GeneratePresignedURL(bucket, key string, expiresIn int) string {
    // Presigned URLs allow temporary public access to private objects
    // => signed with HMAC to prevent URL forgery
    expiry := time.Now().Unix() + int64(expiresIn)
    hash := sha256.Sum256([]byte(fmt.Sprintf("%s/%s/%d", bucket, key, expiry)))
    token := fmt.Sprintf("%x", hash)[:16]
    return fmt.Sprintf("https://storage.example.com/%s/%s?expiry=%d&token=%s",
        bucket, key, expiry, token)
}

func main() {
    store := NewObjectStore()

    // Upload user avatar
    avatarBytes := append([]byte{0xff, 0xd8, 0xff}, make([]byte, 1000)...)
    avatarBytes = append(avatarBytes, 0xff, 0xd9)
    meta := store.Put("profile-images", "users/alice/avatar.jpg",
        avatarBytes, "image/jpeg",
        map[string]string{"user_id": "alice", "original_filename": "photo.jpg"})
    fmt.Printf("Uploaded: %d bytes, etag=%s...\n", meta.SizeBytes, meta.ETag[:8])

    // Upload updated avatar (creates new version)
    updatedBytes := append([]byte{0xff, 0xd8, 0xff}, make([]byte, 2000)...)
    updatedBytes = append(updatedBytes, 0xff, 0xd9)
    meta2 := store.Put("profile-images", "users/alice/avatar.jpg",
        updatedBytes, "image/jpeg", nil)
    fmt.Printf("Updated:  %d bytes (new version)\n", meta2.SizeBytes)

    // Retrieve latest version
    obj := store.Get("profile-images", "users/alice/avatar.jpg", "")
    fmt.Printf("GET latest: %d bytes, version=%s...\n",
        obj.Metadata.SizeBytes, obj.Metadata.VersionID[:8])

    // Retrieve original version by versionID
    original := store.Get("profile-images", "users/alice/avatar.jpg", meta.VersionID)
    fmt.Printf("GET v1:     %d bytes (original preserved)\n", original.Metadata.SizeBytes)

    // Presigned URL for temporary public access
    url := store.GeneratePresignedURL("profile-images", "users/alice/avatar.jpg", 300)
    fmt.Printf("Presigned URL: %s\n", url)

    // List all user objects
    objects := store.ListObjects("profile-images", "users/alice/")
    keys := make([]string, len(objects))
    for i, o := range objects {
        keys[i] = o.Key
    }
    fmt.Printf("Objects under users/alice/: %v\n", keys)

    
}

Key takeaway: Object storage’s flat namespace with version IDs and content-addressed ETags enables immutable, versioned file storage with integrity verification — each upload creates a new addressable version without overwriting the old one.

Why It Matters: Traditional file servers don’t scale to billions of files or exabytes of data. S3, Google Cloud Storage, and Azure Blob Storage solve this with eventual consistency, geo-replication, and virtually unlimited storage. Presigned URLs eliminate the need to proxy large file downloads through application servers — clients download directly from storage, dramatically reducing application server bandwidth and cost.


Logging and Monitoring

Example 51: Structured Logging with Correlation IDs

Structured logging emits log entries as machine-parseable JSON rather than free-form text, enabling log aggregation platforms (Datadog, ELK Stack) to index, filter, and alert on specific fields. Correlation IDs link all log entries from a single request across multiple services, enabling distributed tracing.

package main

import (
    "encoding/json"
    "fmt"
    "sync"
    "time"

)

// goroutine-local context via explicit parameter passing (Go idiom)
// Go doesn't have thread-local storage; context is passed explicitly.

type RequestContext struct {
    CorrelationID string
    UserID        string
}

type StructuredLogger struct {
    service     string
    environment string
}

func NewStructuredLogger(service, environment string) *StructuredLogger {
    return &StructuredLogger{service: service, environment: environment}
}

func (l *StructuredLogger) buildEntry(level, message string, ctx *RequestContext, extra map[string]interface{}) map[string]interface{} {
    entry := map[string]interface{}{
        "timestamp":   time.Now().UTC().Format("2006-01-02T15:04:05Z"),
        "level":       level,
        "service":     l.service,
        "environment": l.environment,
        "message":     message,
    }
    // Inject correlation ID if set in context
    // => links this log entry to the originating HTTP request
    if ctx != nil && ctx.CorrelationID != "" {
        entry["correlation_id"] = ctx.CorrelationID
    }
    if ctx != nil && ctx.UserID != "" {
        entry["user_id"] = ctx.UserID
    }
    for k, v := range extra {
        entry[k] = v
    }
    return entry
}

func (l *StructuredLogger) emit(entry map[string]interface{}) {
    // Emit as single-line JSON — log aggregators parse this
    // => newlines within fields are escaped; one log event = one JSON line
    b, _ := json.Marshal(entry)
    fmt.Println(string(b))
}

func (l *StructuredLogger) Info(msg string, ctx *RequestContext, extra map[string]interface{}) {
    l.emit(l.buildEntry("INFO", msg, ctx, extra))
}

func (l *StructuredLogger) Warn(msg string, ctx *RequestContext, extra map[string]interface{}) {
    l.emit(l.buildEntry("WARN", msg, ctx, extra))
}

func (l *StructuredLogger) Error(msg string, ctx *RequestContext, extra map[string]interface{}) {
    l.emit(l.buildEntry("ERROR", msg, ctx, extra))
}

func main() {
    // Simulate two microservices sharing the same correlation ID
    orderLogger := NewStructuredLogger("order-service", "production")
    paymentLogger := NewStructuredLogger("payment-service", "production")

    fmt.Println("=== Request 1: Successful order ===")
    ctx1 := &RequestContext{
        CorrelationID: fmt.Sprintf("%d", time.Now().UnixNano()),
        UserID:        "user-alice",
    }
    orderLogger.Info("Order received", ctx1, map[string]interface{}{
        "order_id": "ORD-001", "amount": 99.99, "items_count": 3,
    })
    // => {"timestamp":"...","level":"INFO","service":"order-service",
    //     "correlation_id":"<uuid>","user_id":"user-alice",
    //     "message":"Order received","order_id":"ORD-001","amount":99.99}

    // Simulate calling payment service (passes correlation_id in X-Correlation-ID header)
    // => correlation_id propagated via HTTP header to downstream services
    paymentLogger.Info("Payment initiated", ctx1, map[string]interface{}{
        "order_id": "ORD-001", "amount": 99.99, "gateway": "stripe",
    })
    paymentLogger.Info("Payment completed", ctx1, map[string]interface{}{
        "order_id": "ORD-001", "txn_id": "txn_abc123", "duration_ms": 142,
    })
    orderLogger.Info("Order confirmed", ctx1, map[string]interface{}{
        "order_id": "ORD-001", "status": "confirmed",
    })
    fmt.Printf("  All entries share correlation_id=%s... for tracing\n", ctx1.CorrelationID[:8])

    fmt.Println("\n=== Request 2: Payment failure with error context ===")
    ctx2 := &RequestContext{
        CorrelationID: fmt.Sprintf("%d", time.Now().UnixNano()),
        UserID:        "user-bob",
    }
    orderLogger.Info("Order received", ctx2, map[string]interface{}{
        "order_id": "ORD-002", "amount": 299.99,
    })

    paymentLogger.Error("Payment failed", ctx2, map[string]interface{}{
        "order_id":      "ORD-002",
        "error_type":    "ConnectionError",
        "error_message": "Payment gateway timeout after 5000ms",
        "retry_count":   3,
        "duration_ms":   5001,
    })
    orderLogger.Warn("Order failed, will retry", ctx2, map[string]interface{}{
        "order_id":           "ORD-002",
        "retry_scheduled_at": "2026-03-20T10:00:30Z",
    })

    // Unstructured log (bad) vs structured log (good)
    fmt.Println("\n=== Unstructured vs Structured comparison ===")
    fmt.Println("UNSTRUCTURED: ERROR payment failed for user bob order ORD-002 after 5001ms")
    fmt.Println("STRUCTURED:   {level:ERROR, user_id:user-bob, order_id:ORD-002, duration_ms:5001}")
    fmt.Println("=> Structured enables: filter by user_id, alert on duration_ms>5000, count by error_type")

    _ = sync.Mutex{} // suppress unused import
}

Key takeaway: Structured logging with correlation IDs enables distributed request tracing — a single correlation_id links all log entries across services, turning scattered log lines into a coherent request timeline.

Why It Matters: In microservices, a single user request spawns calls to 5-10 services. Without correlation IDs, debugging a failure means manually correlating timestamps across service logs — painful with millisecond precision. With structured logging, correlation_id = "abc-123" retrieves the complete request timeline across all services in one query. DataDog, Splunk, and the ELK Stack are built around structured log ingestion and querying.


Example 52: Metrics Collection and Alerting

Metrics capture quantitative system behavior over time — request counts, latency percentiles, error rates, resource utilization. A metrics system collects time-series data, stores it efficiently, and evaluates alert rules to notify on-call engineers when thresholds are breached.

package main

import (
    "fmt"
    "math"
    "math/rand"
    "sort"
    "sync"
    "time"
)

type MetricPoint struct {
    Value     float64
    Timestamp float64
    Labels    map[string]string
}

type MetricsRegistry struct {
    data      map[string][]MetricPoint
    counters  map[string]float64
    retention float64
    mu        sync.Mutex
}

func NewMetricsRegistry(retentionSeconds float64) *MetricsRegistry {
    return &MetricsRegistry{
        data:      make(map[string][]MetricPoint),
        counters:  make(map[string]float64),
        retention: retentionSeconds,
    }
}

func (r *MetricsRegistry) Record(name string, value float64, labels map[string]string) {
    r.mu.Lock()
    defer r.mu.Unlock()
    // Record a metric observation — O(1) append
    point := MetricPoint{Value: value, Timestamp: float64(time.Now().UnixNano()) / 1e9, Labels: labels}
    r.data[name] = append(r.data[name], point)
    // Keep bounded
    if len(r.data[name]) > 10000 {
        r.data[name] = r.data[name][1:]
    }
}

func (r *MetricsRegistry) Increment(name string, by float64, labels map[string]string) {
    r.mu.Lock()
    r.counters[name] += by
    val := r.counters[name]
    r.mu.Unlock()
    r.Record(name, val, labels)
}

func (r *MetricsRegistry) recentValues(name string, windowSeconds float64) []float64 {
    r.mu.Lock()
    defer r.mu.Unlock()
    cutoff := float64(time.Now().UnixNano())/1e9 - windowSeconds
    var values []float64
    for _, p := range r.data[name] {
        if p.Timestamp >= cutoff {
            values = append(values, p.Value)
        }
    }
    return values
}

func (r *MetricsRegistry) Percentile(name string, p float64, windowSeconds float64) (float64, bool) {
    values := r.recentValues(name, windowSeconds)
    if len(values) == 0 {
        return 0, false
    }
    sort.Float64s(values)
    idx := int(float64(len(values)) * p / 100)
    if idx >= len(values) {
        idx = len(values) - 1
    }
    return values[idx], true
}

func (r *MetricsRegistry) Rate(name string, windowSeconds float64) float64 {
    values := r.recentValues(name, windowSeconds)
    if len(values) == 0 {
        return 0.0
    }
    return float64(len(values)) / windowSeconds
}

func (r *MetricsRegistry) Average(name string, windowSeconds float64) (float64, bool) {
    values := r.recentValues(name, windowSeconds)
    if len(values) == 0 {
        return 0, false
    }
    sum := 0.0
    for _, v := range values {
        sum += v
    }
    return sum / float64(len(values)), true
}

type AlertRule struct {
    Name          string
    Metric        string
    Aggregation   string // "p99", "p95", "avg", "rate"
    Threshold     float64
    Direction     string  // "above" or "below"
    WindowSeconds float64
    Severity      string // "warning" | "critical"
}

type AlertManager struct {
    registry *MetricsRegistry
    rules    []AlertRule
    firing   map[string]bool
}

func NewAlertManager(registry *MetricsRegistry) *AlertManager {
    return &AlertManager{registry: registry, firing: make(map[string]bool)}
}

func (a *AlertManager) AddRule(rule AlertRule) {
    a.rules = append(a.rules, rule)
}

func (a *AlertManager) Evaluate() []map[string]interface{} {
    var events []map[string]interface{}
    for _, rule := range a.rules {
        var current float64
        var ok bool

        if len(rule.Aggregation) > 1 && rule.Aggregation[0] == 'p' {
            pct := 0.0
            fmt.Sscanf(rule.Aggregation[1:], "%f", &pct)
            current, ok = a.registry.Percentile(rule.Metric, pct, rule.WindowSeconds)
        } else if rule.Aggregation == "avg" {
            current, ok = a.registry.Average(rule.Metric, rule.WindowSeconds)
        } else if rule.Aggregation == "rate" {
            current = a.registry.Rate(rule.Metric, rule.WindowSeconds)
            ok = current > 0
        }

        if !ok {
            continue
        }

        breached := (rule.Direction == "above" && current > rule.Threshold) ||
            (rule.Direction == "below" && current < rule.Threshold)

        if breached && !a.firing[rule.Name] {
            a.firing[rule.Name] = true
            events = append(events, map[string]interface{}{
                "type": "FIRING", "rule": rule.Name,
                "severity": rule.Severity,
                "current":  math.Round(current*100) / 100,
                "threshold": rule.Threshold, "metric": rule.Metric,
            })
        } else if !breached && a.firing[rule.Name] {
            delete(a.firing, rule.Name)
            events = append(events, map[string]interface{}{
                "type": "RESOLVED", "rule": rule.Name,
                "current": math.Round(current*100) / 100,
            })
        }
    }
    return events
}

func main() {
    registry := NewMetricsRegistry(300)
    alertManager := NewAlertManager(registry)

    // Define alert rules
    alertManager.AddRule(AlertRule{
        Name: "high_p99_latency", Metric: "http_request_duration_ms",
        Aggregation: "p99", Threshold: 500,
        Direction: "above", WindowSeconds: 60, Severity: "critical",
    })
    alertManager.AddRule(AlertRule{
        Name: "high_error_rate", Metric: "http_errors",
        Aggregation: "rate", Threshold: 0.05,
        Direction: "above", WindowSeconds: 60, Severity: "warning",
    })

    // Simulate normal traffic (low latency)
    for i := 0; i < 50; i++ {
        registry.Record("http_request_duration_ms", 20+rand.Float64()*130, nil)
        if rand.Float64() < 0.01 { // 1% error rate
            registry.Increment("http_errors", 1, nil)
        }
    }

    events := alertManager.Evaluate()
    fmt.Printf("Normal traffic alerts: %v\n", events)
    // => Normal traffic alerts: []

    // Simulate degraded service (high latency spike)
    for i := 0; i < 20; i++ {
        registry.Record("http_request_duration_ms", 600+rand.Float64()*600, nil)
        registry.Increment("http_errors", 1, nil)
    }

    events = alertManager.Evaluate()
    fmt.Println("Degraded service alerts:")
    for _, event := range events {
        fmt.Printf("  [%s] %s: current=%v > threshold=%v (severity=%s)\n",
            event["type"], event["rule"], event["current"],
            event["threshold"], event["severity"])
    }
    // => [FIRING] high_p99_latency: current=900.xx > threshold=500 (severity=critical)
    // => [FIRING] high_error_rate: current=0.xx > threshold=0.05 (severity=warning)
}

Key takeaway: Metrics with percentile aggregation and threshold-based alerting detect service degradation — P99 latency captures tail latency that averages hide, and rate-based error alerts fire before users report failures.

Why It Matters: Averages lie — a service with 95% requests at 10ms and 5% at 2000ms has a 110ms average, masking severe tail latency. P99 alerts catch this. Prometheus, Datadog, and CloudWatch all implement this metrics model. The four golden signals (latency, traffic, errors, saturation) define the minimal viable monitoring set for any production service.


Example 53: Distributed Tracing with Span Propagation

Distributed tracing records the causal chain of operations across services as a tree of “spans.” Each span captures start time, duration, service name, and operation type. A trace is the complete tree representing one user request. Trace context (trace_id, span_id) propagates via HTTP headers across service boundaries.

package main

import (
    "fmt"
    "sort"
    "strings"
    "time"

)

type Span struct {
    TraceID      string
    SpanID       string
    ParentSpanID string // empty for root span
    Service      string
    Operation    string
    StartTime    time.Time
    EndTime      time.Time
    Tags         map[string]interface{}
    Status       string // "ok" | "error"
    ErrorMessage string
}

func (s *Span) DurationMs() float64 {
    if s.EndTime.IsZero() {
        return 0
    }
    return float64(s.EndTime.Sub(s.StartTime).Microseconds()) / 1000.0
}

func (s *Span) Finish(status, errMsg string) {
    s.EndTime = time.Now()
    s.Status = status
    s.ErrorMessage = errMsg
}

type Tracer struct {
    service string
    spans   []*Span
}

func NewTracer(service string) *Tracer {
    return &Tracer{service: service}
}

func (t *Tracer) StartSpan(operation, traceID, parentSpanID string) *Span {
    if traceID == "" {
        traceID = fmt.Sprintf("%d", time.Now().UnixNano())
    }
    span := &Span{
        TraceID:      traceID,
        SpanID:       fmt.Sprintf("%d", time.Now().UnixNano())[:8],
        ParentSpanID: parentSpanID,
        Service:      t.service,
        Operation:    operation,
        StartTime:    time.Now(),
        Tags:         make(map[string]interface{}),
        Status:       "ok",
    }
    t.spans = append(t.spans, span)
    return span
}

func renderTrace(spans []*Span) string {
    if len(spans) == 0 {
        return "(no spans)"
    }
    // Build parentID -> children mapping
    children := make(map[string][]*Span)
    for _, span := range spans {
        children[span.ParentSpanID] = append(children[span.ParentSpanID], span)
    }

    var lines []string
    var renderNode func(parentID string, depth int)
    renderNode = func(parentID string, depth int) {
        nodeSpans := children[parentID]
        sort.Slice(nodeSpans, func(i, j int) bool {
            return nodeSpans[i].StartTime.Before(nodeSpans[j].StartTime)
        })
        for _, span := range nodeSpans {
            indent := strings.Repeat("  ", depth)
            dur := fmt.Sprintf("%.1fms", span.DurationMs())
            statusIcon := "OK"
            if span.Status != "ok" {
                statusIcon = "ERR"
            }
            lines = append(lines, fmt.Sprintf("%s[%s] %s.%s (%s) span=%s",
                indent, statusIcon, span.Service, span.Operation, dur, span.SpanID))
            renderNode(span.SpanID, depth+1)
        }
    }

    renderNode("", 0)
    return strings.Join(lines, "\n")
}

func main() {
    // Simulate a distributed trace: API -> OrderService -> PaymentService + InventoryService
    apiTracer := NewTracer("api-gateway")
    orderTracer := NewTracer("order-service")
    paymentTracer := NewTracer("payment-service")
    inventoryTracer := NewTracer("inventory-service")

    var allSpans []*Span
    traceID := fmt.Sprintf("%d", time.Now().UnixNano())

    // Root span: API gateway receives HTTP request
    rootSpan := apiTracer.StartSpan("HTTP POST /orders", traceID, "")
    allSpans = append(allSpans, rootSpan)
    time.Sleep(2 * time.Millisecond) // 2ms routing

    // Child span: order service processes the request
    orderSpan := orderTracer.StartSpan("createOrder", traceID, rootSpan.SpanID)
    allSpans = append(allSpans, orderSpan)
    orderSpan.Tags["order_id"] = "ORD-001"
    time.Sleep(5 * time.Millisecond) // 5ms business logic

    // Parallel children: payment and inventory called concurrently
    paySpan := paymentTracer.StartSpan("chargeCard", traceID, orderSpan.SpanID)
    allSpans = append(allSpans, paySpan)
    paySpan.Tags["amount"] = 99.99
    time.Sleep(45 * time.Millisecond) // 45ms payment processing
    paySpan.Finish("ok", "")

    invSpan := inventoryTracer.StartSpan("reserveItems", traceID, orderSpan.SpanID)
    allSpans = append(allSpans, invSpan)
    invSpan.Tags["sku"] = "WIDGET-A"
    time.Sleep(10 * time.Millisecond) // 10ms inventory check
    invSpan.Finish("ok", "")

    orderSpan.Finish("ok", "")
    rootSpan.Finish("ok", "")

    fmt.Println("=== Distributed Trace ===")
    fmt.Printf("Trace ID: %s...\n", traceID[:16])
    fmt.Println(renderTrace(allSpans))
    // => [OK] api-gateway.HTTP POST /orders (Xms) span=...
    // =>   [OK] order-service.createOrder (Xms) span=...
    // =>     [OK] payment-service.chargeCard (45ms) span=...
    // =>     [OK] inventory-service.reserveItems (10ms) span=...

    // Calculate critical path
    fmt.Printf("\nTotal request duration: %.1fms\n", allSpans[0].DurationMs())
    slowest := allSpans[0]
    for _, s := range allSpans {
        if s.DurationMs() > slowest.DurationMs() {
            slowest = s
        }
    }
    fmt.Printf("Slowest span: %s.%s (%.1fms)\n",
        slowest.Service, slowest.Operation, slowest.DurationMs())
    // => Slowest span: payment-service.chargeCard (45ms) — optimization target
}

Key takeaway: Distributed tracing connects operations across services into a causal trace tree, revealing the critical path — the sequence of spans that determines total request duration and where optimization effort should focus.

Why It Matters: Without distributed tracing, a 500ms request in a microservices system requires manually correlating timestamps across service logs. Jaeger, Zipkin, and AWS X-Ray implement OpenTelemetry to auto-instrument frameworks and collect traces. The critical path analysis reveals that optimizing a 5ms database query has negligible impact when a 450ms payment API call dominates — tracing makes this visible immediately.


Example 54: Key-Value Cache with TTL and LRU Eviction

An in-memory cache stores frequently accessed data to reduce database load. LRU (Least Recently Used) eviction removes the item accessed longest ago when the cache is full. TTL (Time-To-Live) ensures cached data doesn’t become stale indefinitely — each entry expires after a configured duration.

package main

import (
    "container/list"
    "fmt"
    "time"
)

type CacheEntry struct {
    Key       string
    Value     interface{}
    ExpiresAt time.Time
    Hits      int
}

// LRUCache with per-entry TTL and hit/miss tracking.
type LRUCache struct {
    maxSize    int
    defaultTTL time.Duration
    items      map[string]*list.Element
    order      *list.List // front = MRU, back = LRU
    hits       int
    misses     int
    evictions  int
}

func NewLRUCache(maxSize int, defaultTTL time.Duration) *LRUCache {
    return &LRUCache{
        maxSize:    maxSize,
        defaultTTL: defaultTTL,
        items:      make(map[string]*list.Element),
        // list maintains access order; we move to front on access
        // => most recently accessed item is always at front
        // => LRU item is always at back
        order: list.New(),
    }
}

func (c *LRUCache) Set(key string, value interface{}, ttl time.Duration) {
    if ttl == 0 {
        ttl = c.defaultTTL
    }
    expiresAt := time.Now().Add(ttl)
    entry := &CacheEntry{Key: key, Value: value, ExpiresAt: expiresAt}

    if elem, ok := c.items[key]; ok {
        // Update existing entry: remove and re-insert to move to MRU
        c.order.Remove(elem)
        delete(c.items, key)
    }

    elem := c.order.PushFront(entry)
    c.items[key] = elem

    // Enforce capacity: evict LRU item when over maxSize
    for c.order.Len() > c.maxSize {
        back := c.order.Back()
        evicted := back.Value.(*CacheEntry)
        c.order.Remove(back)
        delete(c.items, evicted.Key)
        c.evictions++
        // => Remove removes least recently used (back of list)
        fmt.Printf("[CACHE] LRU eviction: '%s' (size=%d)\n", evicted.Key, c.order.Len())
    }
}

func (c *LRUCache) Get(key string) (interface{}, bool) {
    elem, ok := c.items[key]
    if !ok {
        c.misses++
        return nil, false // cache miss
    }

    entry := elem.Value.(*CacheEntry)
    if time.Now().After(entry.ExpiresAt) {
        // TTL expired: treat as miss, remove stale entry
        c.order.Remove(elem)
        delete(c.items, key)
        c.misses++
        fmt.Printf("[CACHE] TTL expired: '%s'\n", key)
        return nil, false
    }

    // Cache hit: move to front (most recently used position)
    c.order.MoveToFront(elem)
    entry.Hits++
    c.hits++
    return entry.Value, true
}

func (c *LRUCache) Delete(key string) bool {
    if elem, ok := c.items[key]; ok {
        c.order.Remove(elem)
        delete(c.items, key)
        return true
    }
    return false
}

func (c *LRUCache) HitRate() float64 {
    total := c.hits + c.misses
    if total == 0 {
        return 0.0
    }
    return float64(c.hits) / float64(total)
}

func (c *LRUCache) Stats() map[string]interface{} {
    return map[string]interface{}{
        "size":      c.order.Len(),
        "max_size":  c.maxSize,
        "hits":      c.hits,
        "misses":    c.misses,
        "hit_rate":  fmt.Sprintf("%.1f%%", c.HitRate()*100),
        "evictions": c.evictions,
    }
}

func main() {
    cache := NewLRUCache(3, 2*time.Second)
    // => cache is empty, capacity=3

    // Fill the cache to max capacity
    cache.Set("user:1", map[string]string{"name": "Alice", "plan": "pro"}, 0)
    cache.Set("user:2", map[string]string{"name": "Bob", "plan": "free"}, 0)
    cache.Set("user:3", map[string]string{"name": "Carol", "plan": "enterprise"}, 0)
    // => cache has 3 entries (at capacity)

    // Access user:1 — moves it to "most recently used" front
    result, _ := cache.Get("user:1")
    fmt.Printf("GET user:1: %v\n", result.(map[string]string)["name"])
    // => GET user:1: Alice (cache hit, user:1 moves to MRU)
    // => LRU order is now: user:2 (LRU), user:3, user:1 (MRU)

    // Add a 4th entry — evicts user:2 (LRU)
    cache.Set("user:4", map[string]string{"name": "Dave", "plan": "free"}, 0)
    // => [CACHE] LRU eviction: 'user:2'

    _, ok := cache.Get("user:2")
    fmt.Printf("GET user:2: found=%v\n", ok)
    // => GET user:2: found=false (evicted — cache miss)

    // TTL expiry demo
    cache.Set("session:xyz", map[string]string{"token": "abc123"}, 100*time.Millisecond)
    time.Sleep(150 * time.Millisecond) // wait for TTL to expire
    _, ok = cache.Get("session:xyz")
    fmt.Printf("GET session:xyz after TTL: found=%v\n", ok)
    // => [CACHE] TTL expired: 'session:xyz'
    // => GET session:xyz after TTL: found=false

    fmt.Printf("Cache stats: %v\n", cache.Stats())
    // => Cache stats: map[evictions:1 hit_rate:33.3% hits:1 max_size:3 misses:2 size:3]
}

Key takeaway: LRU eviction preserves the working set (frequently accessed data) in cache while TTL bounds staleness — combining both policies ensures the cache serves fresh, high-value data within memory limits.

Why It Matters: Redis and Memcached implement LRU variants as their default eviction policy. Without TTL, cached database records become permanently stale after source updates. Without LRU, a one-time bulk scan (e.g., a background job reading all users) evicts the entire working set, causing a cache stampede. Redis’s maxmemory-policy allkeys-lru is the recommended default for most caching workloads.


Example 55: Write-Through and Write-Back Cache Patterns

Cache write strategies determine data consistency between cache and storage. Write-through updates both cache and storage synchronously on every write — strong consistency, higher write latency. Write-back (write-behind) updates the cache immediately and asynchronously flushes to storage — lower write latency, risk of data loss on cache crash.

package main

import (
    "fmt"
    "sync"
    "time"
)

// Simulated database (slow writes: 20ms)
type SlowDatabase struct {
    store      map[string]interface{}
    writeCount int
    mu         sync.Mutex
}

func NewSlowDatabase() *SlowDatabase {
    return &SlowDatabase{store: make(map[string]interface{})}
}

func (db *SlowDatabase) Write(key string, value interface{}) {
    time.Sleep(20 * time.Millisecond) // 20ms write latency
    db.mu.Lock()
    db.store[key] = value
    db.writeCount++
    db.mu.Unlock()
}

func (db *SlowDatabase) Read(key string) (interface{}, bool) {
    time.Sleep(5 * time.Millisecond) // 5ms read latency
    db.mu.Lock()
    defer db.mu.Unlock()
    v, ok := db.store[key]
    return v, ok
}

type WriteThroughCache struct {
    cache map[string]interface{}
    db    *SlowDatabase
    mu    sync.Mutex
}

func NewWriteThroughCache(db *SlowDatabase) *WriteThroughCache {
    return &WriteThroughCache{cache: make(map[string]interface{}), db: db}
}

func (c *WriteThroughCache) Write(key string, value interface{}) float64 {
    start := time.Now()
    // Write to both cache and DB synchronously
    // => write latency = DB write latency (20ms) + negligible cache write
    c.mu.Lock()
    c.cache[key] = value
    c.mu.Unlock()
    c.db.Write(key, value)
    // => DB write completes before this method returns — strong consistency
    // => if process crashes, data is already in DB — no data loss
    return float64(time.Since(start).Microseconds()) / 1000.0
}

func (c *WriteThroughCache) Read(key string) (interface{}, bool) {
    c.mu.Lock()
    v, ok := c.cache[key]
    c.mu.Unlock()
    if ok {
        return v, true
    }
    // Cache miss: read from DB and populate cache
    v, ok = c.db.Read(key)
    if ok {
        c.mu.Lock()
        c.cache[key] = v
        c.mu.Unlock()
    }
    return v, ok
}

type WriteBackCache struct {
    cache         map[string]interface{}
    db            *SlowDatabase
    dirty         map[string]bool
    writeBuffer   []struct{ key string; value interface{} }
    mu            sync.Mutex
    flushInterval time.Duration
    running       bool
}

func NewWriteBackCache(db *SlowDatabase, flushInterval time.Duration) *WriteBackCache {
    c := &WriteBackCache{
        cache:         make(map[string]interface{}),
        db:            db,
        dirty:         make(map[string]bool),
        flushInterval: flushInterval,
        running:       true,
    }
    go c.backgroundFlush()
    return c
}

func (c *WriteBackCache) Write(key string, value interface{}) float64 {
    start := time.Now()
    c.mu.Lock()
    c.cache[key] = value
    c.dirty[key] = true
    c.writeBuffer = append(c.writeBuffer, struct{ key string; value interface{} }{key, value})
    c.mu.Unlock()
    // Returns immediately — DB write happens in background
    // => write latency = cache write only (~microseconds)
    // => if crash occurs before flush, dirty data is lost
    return float64(time.Since(start).Microseconds()) / 1000.0
}

func (c *WriteBackCache) backgroundFlush() {
    for c.running {
        time.Sleep(c.flushInterval)
        c.mu.Lock()
        pending := make([]struct{ key string; value interface{} }, len(c.writeBuffer))
        copy(pending, c.writeBuffer)
        c.writeBuffer = nil
        c.dirty = make(map[string]bool)
        c.mu.Unlock()
        // Flush to DB outside the lock (slow operation)
        for _, item := range pending {
            c.db.Write(item.key, item.value)
            // => DB writes happen asynchronously in batch
        }
    }
}

func (c *WriteBackCache) Read(key string) (interface{}, bool) {
    c.mu.Lock()
    v, ok := c.cache[key]
    c.mu.Unlock()
    if ok {
        return v, true
    }
    v, ok = c.db.Read(key)
    if ok {
        c.mu.Lock()
        c.cache[key] = v
        c.mu.Unlock()
    }
    return v, ok
}

func (c *WriteBackCache) Shutdown() {
    c.running = false
}

func main() {
    db1 := NewSlowDatabase()
    db2 := NewSlowDatabase()
    wtCache := NewWriteThroughCache(db1)
    wbCache := NewWriteBackCache(db2, 50*time.Millisecond)

    fmt.Println("=== Write-Through (strong consistency) ===")
    var wtTimes []float64
    for i := 0; i < 3; i++ {
        latency := wtCache.Write(fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i))
        wtTimes = append(wtTimes, latency)
        fmt.Printf("  Write key:%d: %.1fms\n", i, latency)
    }
    avgWT := 0.0
    for _, t := range wtTimes {
        avgWT += t
    }
    avgWT /= float64(len(wtTimes))
    fmt.Printf("  Avg write latency: %.1fms\n", avgWT)
    fmt.Printf("  DB writes completed: %d\n", db1.writeCount)
    // => Each write: ~20ms (waits for DB)
    // => DB writes completed: 3 (synchronous)

    fmt.Println("\n=== Write-Back (low latency, async flush) ===")
    var wbTimes []float64
    for i := 0; i < 3; i++ {
        latency := wbCache.Write(fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i))
        wbTimes = append(wbTimes, latency)
        fmt.Printf("  Write key:%d: %.4fms\n", i, latency)
    }
    avgWB := 0.0
    for _, t := range wbTimes {
        avgWB += t
    }
    avgWB /= float64(len(wbTimes))
    fmt.Printf("  Avg write latency: %.4fms\n", avgWB)
    db2.mu.Lock()
    fmt.Printf("  DB writes completed immediately: %d\n", db2.writeCount)
    db2.mu.Unlock()
    // => Each write: ~0.001ms (returns after cache update)
    // => DB writes completed immediately: 0 (async flush pending)

    time.Sleep(200 * time.Millisecond) // wait for background flush
    db2.mu.Lock()
    fmt.Printf("  DB writes completed after flush: %d\n", db2.writeCount)
    db2.mu.Unlock()
    // => DB writes completed after flush: 3

    wbCache.Shutdown()

    fmt.Println("\n=== Trade-off summary ===")
    fmt.Printf("  Write-through: %.1fms/write, zero data loss risk\n", avgWT)
    fmt.Printf("  Write-back:    %.4fms/write, flush latency data loss risk\n", avgWB)
}

Key takeaway: Write-through prioritizes consistency over latency by persisting synchronously; write-back prioritizes throughput by batching DB writes asynchronously at the risk of losing unflushed data on crash.

Why It Matters: Write-back is the default strategy in database buffer pools (PostgreSQL’s shared_buffers, InnoDB buffer pool) — dirty pages are flushed asynchronously. This is why databases need WAL (Write-Ahead Logging) to survive crashes without losing committed transactions. Redis’s AOF (Append-Only File) provides durable write-back via fsync policies. Choosing the wrong strategy causes either latency spikes (write-through under write storms) or data loss (write-back without durable journals).


Example 56: Load Balancer Algorithms — Round Robin, Least Connections, and Weighted

A load balancer distributes requests across server instances to prevent any single server from becoming a bottleneck. Different algorithms suit different workloads: round-robin is simple and uniform; least-connections routes to the least-busy server; weighted routing handles heterogeneous hardware.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type BackendServer struct {
    ServerID          string
    Host              string
    Port              int
    Weight            int
    ActiveConnections int
    TotalRequests     int
    mu                sync.Mutex
}

func (s *BackendServer) HandleRequest(duration time.Duration) map[string]interface{} {
    s.mu.Lock()
    s.ActiveConnections++
    s.TotalRequests++
    s.mu.Unlock()
    time.Sleep(duration) // simulate request processing
    s.mu.Lock()
    s.ActiveConnections--
    s.mu.Unlock()
    return map[string]interface{}{
        "served_by":   s.ServerID,
        "duration_ms": duration.Milliseconds(),
    }
}

type LoadBalancer struct {
    servers []*BackendServer
    rrIndex int
    mu      sync.Mutex
}

func NewLoadBalancer(servers []*BackendServer) *LoadBalancer {
    return &LoadBalancer{servers: servers}
}

func (lb *LoadBalancer) RoundRobin() *BackendServer {
    // Distribute requests cyclically — simple, ignores server load.
    lb.mu.Lock()
    defer lb.mu.Unlock()
    server := lb.servers[lb.rrIndex%len(lb.servers)]
    lb.rrIndex++
    return server
}

func (lb *LoadBalancer) LeastConnections() *BackendServer {
    // Route to server with fewest active connections — adapts to load.
    lb.mu.Lock()
    defer lb.mu.Unlock()
    // Choose server with minimum activeConnections — O(N) scan
    // => production: maintain min-heap for O(log N) selection
    best := lb.servers[0]
    for _, s := range lb.servers[1:] {
        s.mu.Lock()
        bestConns := best.ActiveConnections
        sConns := s.ActiveConnections
        s.mu.Unlock()
        best.mu.Lock()
        bestConns = best.ActiveConnections
        best.mu.Unlock()
        if sConns < bestConns {
            best = s
        }
    }
    return best
}

func (lb *LoadBalancer) WeightedRoundRobin() *BackendServer {
    // Weight-aware round-robin — powerful servers get proportionally more traffic.
    lb.mu.Lock()
    defer lb.mu.Unlock()
    // Expand server list by weight: server with weight=3 appears 3 times
    // => production: use GCD normalization to avoid large lists
    var pool []*BackendServer
    for _, s := range lb.servers {
        for i := 0; i < s.Weight; i++ {
            pool = append(pool, s)
        }
    }
    server := pool[lb.rrIndex%len(pool)]
    lb.rrIndex++
    return server
}

func (lb *LoadBalancer) DistributionStats() map[string]int {
    stats := make(map[string]int)
    for _, s := range lb.servers {
        stats[s.ServerID] = s.TotalRequests
    }
    return stats
}

func main() {
    // Three servers: server-3 is 2x as powerful (weight=2)
    servers := []*BackendServer{
        {ServerID: "server-1", Host: "10.0.0.1", Port: 8080, Weight: 1},
        {ServerID: "server-2", Host: "10.0.0.2", Port: 8080, Weight: 1},
        {ServerID: "server-3", Host: "10.0.0.3", Port: 8080, Weight: 2}, // 2x capacity
    }
    lb := NewLoadBalancer(servers)

    simulateRequests := func(algorithm string, count int) {
        for _, s := range servers {
            s.TotalRequests = 0
            s.ActiveConnections = 0
        }
        lb.mu.Lock()
        lb.rrIndex = 0
        lb.mu.Unlock()

        var wg sync.WaitGroup
        for i := 0; i < count; i++ {
            var server *BackendServer
            switch algorithm {
            case "round_robin":
                server = lb.RoundRobin()
            case "least_connections":
                server = lb.LeastConnections()
            default:
                server = lb.WeightedRoundRobin()
            }
            duration := time.Duration(1+rand.Intn(10)) * time.Millisecond
            wg.Add(1)
            go func(s *BackendServer, d time.Duration) {
                defer wg.Done()
                s.HandleRequest(d)
            }(server, duration)
            time.Sleep(1 * time.Millisecond) // small delay between requests
        }
        wg.Wait()
    }

    fmt.Println("=== Round Robin (uniform weights) ===")
    simulateRequests("round_robin", 30)
    stats := lb.DistributionStats()
    fmt.Printf("  Distribution: %v\n", stats)
    // => Distribution: map[server-1:10 server-2:10 server-3:10]
    // => Equal distribution — ignores server-3's higher capacity

    fmt.Println("\n=== Least Connections (adapts to load) ===")
    simulateRequests("least_connections", 30)
    stats = lb.DistributionStats()
    fmt.Printf("  Distribution: %v\n", stats)
    // => Distribution: roughly equal for uniform request durations

    fmt.Println("\n=== Weighted Round Robin (capacity-aware) ===")
    simulateRequests("weighted_round_robin", 40)
    stats = lb.DistributionStats()
    fmt.Printf("  Distribution: %v\n", stats)
    // => Distribution: server-1 gets ~25%, server-2 ~25%, server-3 ~50%
    // => server-3 receives 2x traffic proportional to its weight
    total := 0
    for _, c := range stats {
        total += c
    }
    for sid, count := range stats {
        fmt.Printf("    %s: %d/%d = %.0f%%\n", sid, count, total, float64(count)/float64(total)*100)
    }
}

Key takeaway: Round-robin distributes uniformly across identical servers; least-connections adapts to variable request durations; weighted routing handles heterogeneous hardware by routing proportionally to server capacity.

Why It Matters: AWS ALB and Nginx use round-robin by default. Least-connections is critical when request processing times vary significantly — a slow endpoint that takes 500ms will back up requests on one server if using round-robin. HAProxy’s balance leastconn implements least-connections. Weighted routing enables phased migrations: gradually increase weight on new instance type while decreasing weight on old, achieving zero-downtime hardware upgrades.


Example 57: Idempotency Keys for Safe Retries

Idempotency ensures that retrying an operation produces the same result as executing it once. Without idempotency, retrying a failed payment request may charge the user twice. Idempotency keys allow clients to safely retry operations — the server deduplicates based on the key and returns the original result.

package main

import (
    "crypto/sha256"
    "fmt"
    "sort"
    "time"

)

type IdempotencyRecord struct {
    Key         string
    RequestHash string
    Response    map[string]interface{}
    Status      string // "processing" | "completed" | "failed"
    CreatedAt   float64
    CompletedAt float64
}

type IdempotencyStore struct {
    store map[string]*IdempotencyRecord
    ttl   float64 // seconds
}

func NewIdempotencyStore(ttlSeconds float64) *IdempotencyStore {
    return &IdempotencyStore{
        store: make(map[string]*IdempotencyRecord),
        ttl:   ttlSeconds,
    }
}

func (s *IdempotencyStore) Get(key string) *IdempotencyRecord {
    record, ok := s.store[key]
    if !ok {
        return nil
    }
    if float64(time.Now().Unix())-record.CreatedAt > s.ttl {
        delete(s.store, key)
        return nil
    }
    return record
}

func (s *IdempotencyStore) Create(key, requestHash string) *IdempotencyRecord {
    record := &IdempotencyRecord{
        Key:         key,
        RequestHash: requestHash,
        Status:      "processing",
        CreatedAt:   float64(time.Now().Unix()),
    }
    s.store[key] = record
    return record
}

func (s *IdempotencyStore) Complete(key string, response map[string]interface{}, status string) {
    if record, ok := s.store[key]; ok {
        record.Response = response
        record.Status = status
        record.CompletedAt = float64(time.Now().Unix())
    }
}

type PaymentService struct {
    idempotencyStore *IdempotencyStore
    actualCharges    []map[string]interface{}
    chargeCount      int
}

func NewPaymentService() *PaymentService {
    return &PaymentService{
        idempotencyStore: NewIdempotencyStore(86400), // 24-hour TTL
    }
}

func (svc *PaymentService) Charge(idempotencyKey, orderID string, amount float64, currency string) (map[string]interface{}, error) {
    // Build request hash from sorted body fields
    bodyStr := fmt.Sprintf("[amount:%.2f currency:%s order_id:%s]", amount, currency, orderID)
    hash := sha256.Sum256([]byte(bodyStr))
    requestHash := fmt.Sprintf("%x", hash)

    // Check idempotency store BEFORE processing
    existing := svc.idempotencyStore.Get(idempotencyKey)
    if existing != nil {
        if existing.RequestHash != requestHash {
            // Same key, different payload — reject with 422
            // => prevents key reuse for different operations
            return map[string]interface{}{
                "error": "Idempotency key already used with different request body",
                "status": 422,
            }, nil
        }
        if existing.Status == "completed" {
            // Duplicate request: return cached response — no side effects
            // => client retried a completed request; replay the original result
            fmt.Printf("[PAYMENT] IDEMPOTENT replay for key=%s...\n", idempotencyKey[:16])
            return existing.Response, nil
        }
        if existing.Status == "processing" {
            return map[string]interface{}{
                "error": "Request is already being processed", "status": 409,
            }, nil
        }
    }

    // First request: create idempotency record BEFORE processing
    svc.idempotencyStore.Create(idempotencyKey, requestHash)

    // Process the payment
    svc.chargeCount++
    chargeNum := svc.chargeCount
    if chargeNum == 2 {
        // Simulate a network failure during processing — client will retry
        svc.idempotencyStore.Complete(idempotencyKey, nil, "failed")
        return nil, fmt.Errorf("Payment gateway timeout")
    }

    // Payment succeeded
    txnID := fmt.Sprintf("TXN-%s", fmt.Sprintf("%d", time.Now().UnixNano())[:8])
    response := map[string]interface{}{
        "status":     "success",
        "txn_id":     txnID,
        "order_id":   orderID,
        "amount":     amount,
        "currency":   currency,
        "charged_at": float64(time.Now().Unix()),
    }
    svc.actualCharges = append(svc.actualCharges, map[string]interface{}{
        "txn_id": txnID, "amount": amount,
    })
    svc.idempotencyStore.Complete(idempotencyKey, response, "completed")
    fmt.Printf("[PAYMENT] Charged $%.2f -> %s (charge #%d)\n", amount, txnID, chargeNum)
    return response, nil
}

func main() {
    svc := NewPaymentService()
    idemKey := fmt.Sprintf("%d", time.Now().UnixNano())

    fmt.Println("=== Scenario 1: First successful charge ===")
    result1, _ := svc.Charge(idemKey, "ORD-001", 99.99, "USD")
    fmt.Printf("  Result: %s txn=%s\n", result1["status"], result1["txn_id"])

    fmt.Println("\n=== Scenario 2: Retry of same request (network retry) ===")
    result2, _ := svc.Charge(idemKey, "ORD-001", 99.99, "USD")
    fmt.Printf("  Result: %s txn=%s\n", result2["status"], result2["txn_id"])
    fmt.Printf("  Same txn_id? %v\n", result1["txn_id"] == result2["txn_id"])
    // => [PAYMENT] IDEMPOTENT replay for key=...
    // => Same txn_id? true — same result returned, no second charge

    fmt.Printf("\nActual charges in ledger: %d\n", len(svc.actualCharges))
    // => Actual charges in ledger: 1 — retried request was NOT re-processed

    fmt.Println("\n=== Scenario 3: New request with different idempotency key ===")
    newKey := fmt.Sprintf("%d", time.Now().UnixNano())
    result3, err := svc.Charge(newKey, "ORD-002", 49.99, "USD")
    if err != nil {
        fmt.Printf("  Error: %s\n", err)
    } else {
        fmt.Printf("  Result: %s txn=%s\n", result3["status"], result3["txn_id"])
    }
    fmt.Printf("Actual charges after new key: %d\n", len(svc.actualCharges))
    // => Actual charges after new key: 2 — new key = new charge

    _ = sort.Strings // suppress unused import
}

Key takeaway: Idempotency keys enable safe retries by recording request outcomes and replaying the original response on duplicates — ensuring “at-most-once” semantics for state-changing operations.

Why It Matters: HTTP is unreliable — requests time out, connections reset, load balancers drop packets. Without idempotency, every timeout forces a choice between “retry and risk double-charging” or “don’t retry and risk losing the transaction.” Stripe, PayPal, and Twilio require idempotency keys for payment and messaging APIs. The Stripe API header Idempotency-Key is the canonical implementation. Idempotency also enables exactly-once processing in message queues when combined with at-least-once delivery.

Last updated