Cluster Replication

TopGun clusters automatically replicate data across nodes for high availability and fault tolerance. This guide covers server-side replication configuration, consistency levels, and anti-entropy repair.

Overview

Partitioned Data

271 partitions with consistent hashing

Backup Replicas

Each partition replicated to backup nodes

Anti-Entropy Repair

Merkle tree-based consistency verification

Cluster Setup

import { ServerCoordinator } from '@topgunbuild/server';

// Node 1 (seed node)
const node1 = new ServerCoordinator({
  port: 8080,
  nodeId: 'node-1',
  host: 'localhost',
  clusterPort: 9000,
  peers: [],  // First node has no peers
  jwtSecret: process.env.JWT_SECRET,
  replicationEnabled: true,
  defaultConsistency: 'EVENTUAL',
});

await node1.ready();

// Node 2 (joins via seed)
const node2 = new ServerCoordinator({
  port: 8081,
  nodeId: 'node-2',
  host: 'localhost',
  clusterPort: 9001,
  peers: ['localhost:9000'],  // Connect to node-1
  jwtSecret: process.env.JWT_SECRET,
  replicationEnabled: true,
  defaultConsistency: 'EVENTUAL',
});

await node2.ready();

Gossip Protocol: You only need to provide one seed node. Nodes automatically discover each other via gossip protocol - when node2 connects to node1, it learns about all other members.

Gossip-Based Discovery

// Cluster membership uses gossip protocol
// Nodes automatically discover each other

const server = new ServerCoordinator({
  clusterPort: 9000,
  peers: ['node1:9000'],  // Just one seed is enough
});

// When node2 joins:
// 1. Connects to seed node (node1)
// 2. Sends HELLO message with its info
// 3. Receives MEMBER_LIST with all known members
// 4. Connects to discovered members
// 5. Broadcasts updated member list

// Result: All nodes learn about all others automatically

Consistency Levels

TopGun supports three consistency levels for replication:

// EVENTUAL - Fire-and-forget replication (default)
// Best for: High throughput, tolerance for brief inconsistency
const eventualServer = new ServerCoordinator({
  defaultConsistency: 'EVENTUAL',
  replicationConfig: {
    batchIntervalMs: 50,   // Batch replication every 50ms
    batchSize: 100,        // Max operations per batch
    queueSizeLimit: 10000, // Max queued operations
  },
});

// QUORUM - Wait for majority of replicas
// Best for: Balance of consistency and availability
const quorumServer = new ServerCoordinator({
  defaultConsistency: 'QUORUM',
  replicationConfig: {
    ackTimeoutMs: 5000,  // Wait up to 5s for acks
  },
});

// STRONG - Wait for all replicas
// Best for: Critical data requiring full consistency
const strongServer = new ServerCoordinator({
  defaultConsistency: 'STRONG',
  replicationConfig: {
    ackTimeoutMs: 10000, // Longer timeout for all replicas
  },
});

Comparison

LevelLatencyConsistencyAvailabilityUse Case
EVENTUALLowestEventually consistentHighestReal-time updates, gaming, social feeds
QUORUMMediumMajority agreeHighMost applications, shopping carts
STRONGHighestAll replicas agreeLowerFinancial data, inventory counts

Partitioning

// TopGun uses consistent hashing with 271 partitions
// Each partition has an owner and backup nodes

// Data is automatically distributed:
// - Key is hashed to partition ID (0-270)
// - Partition owner stores the primary copy
// - Backup nodes store replica copies

// Example: With 3 nodes and BACKUP_COUNT=1
// - Node A owns ~90 partitions, backs up ~90 others
// - Node B owns ~90 partitions, backs up ~90 others
// - Node C owns ~90 partitions, backs up ~90 others

// Total: Each piece of data exists on 2 nodes (owner + 1 backup)

Data Flow

  1. Write arrives at any node
  2. Routing: If not owner, forward to partition owner
  3. Owner stores data locally using CRDT merge
  4. Replication: Owner sends to backup nodes
  5. Acknowledgment: Based on consistency level

Important: Only the partition owner and its backups store the data. Other nodes route requests but don’t keep copies. This is the partitioned data grid model, not full replication.

Anti-Entropy Repair

TopGun uses Merkle trees for efficient anti-entropy repair:

// Anti-entropy repair runs automatically
const server = new ServerCoordinator({
  replicationEnabled: true,
  // RepairScheduler configuration (defaults shown)
  repairConfig: {
    enabled: true,
    scanIntervalMs: 300000,    // Full scan every 5 minutes
    repairBatchSize: 1000,     // Keys per repair batch
    maxConcurrentRepairs: 2,   // Parallel repair streams
    throttleMs: 100,           // Delay between batches
    prioritizeRecent: true,    // Repair recent data first
  },
});

// Repair process:
// 1. MerkleTreeManager builds hash tree of partition data
// 2. RepairScheduler compares Merkle roots with backup nodes
// 3. Differences trigger targeted data sync
// 4. Uses efficient bucket-level comparison before key-level

How It Works

1

Build Merkle Trees

Each node builds a Merkle tree of its partition data, hashing keys into buckets.

2

Compare Roots

RepairScheduler periodically compares Merkle roots between owner and backup.

3

Drill Down

If roots differ, compare bucket hashes to find divergent buckets.

4

Repair Keys

Fetch and sync only the specific keys that differ, minimizing network traffic.

Failover Handling

// Automatic failover on node failure
// FailureDetector uses Phi Accrual algorithm

const server = new ServerCoordinator({
  // FailureDetector config (defaults shown)
  failureDetector: {
    heartbeatIntervalMs: 1000,
    suspicionTimeoutMs: 5000,
    confirmationTimeoutMs: 10000,
    phiThreshold: 8,
  },
});

// Failover process:
// 1. Heartbeats stop arriving from failed node
// 2. Phi value exceeds threshold -> node marked SUSPECT
// 3. After confirmationTimeout -> node marked FAILED
// 4. PartitionService rebalances:
//    - Backup nodes become new owners
//    - Remaining nodes become new backups
// 5. Clients receive updated partition map

Phi Accrual Failure Detector

TopGun uses the Phi Accrual failure detector algorithm, which:

  • Tracks heartbeat arrival times
  • Calculates probability of node failure
  • Adapts to network conditions automatically
  • Avoids false positives from temporary network issues

Read Replicas

// Read from replicas for lower latency
const server = new ServerCoordinator({
  replicationEnabled: true,
  // ReadReplicaHandler is automatically enabled
});

// Read routing options:
// - Primary: Always read from partition owner
// - Replica: Read from nearest replica (lower latency)
// - Any: Read from any node that has the data

// Clients can specify read preference per query

Docker Compose Example

Deploy a 3-node cluster with Docker Compose:

version: '3.8'

services:
  node1:
    image: topgun:latest
    ports:
      - "8080:8080"
      - "9000:9000"
    environment:
      TOPGUN_NODE_ID: node-1
      TOPGUN_PORT: 8080
      TOPGUN_CLUSTER_PORT: 9000
      TOPGUN_PEERS: ""
      TOPGUN_REPLICATION: "true"
      TOPGUN_CONSISTENCY: EVENTUAL

  node2:
    image: topgun:latest
    ports:
      - "8081:8080"
      - "9001:9000"
    environment:
      TOPGUN_NODE_ID: node-2
      TOPGUN_PORT: 8080
      TOPGUN_CLUSTER_PORT: 9000
      TOPGUN_PEERS: node1:9000
      TOPGUN_REPLICATION: "true"
      TOPGUN_CONSISTENCY: EVENTUAL
    depends_on:
      - node1

  node3:
    image: topgun:latest
    ports:
      - "8082:8080"
      - "9002:9000"
    environment:
      TOPGUN_NODE_ID: node-3
      TOPGUN_PORT: 8080
      TOPGUN_CLUSTER_PORT: 9000
      TOPGUN_PEERS: node1:9000
      TOPGUN_REPLICATION: "true"
      TOPGUN_CONSISTENCY: EVENTUAL
    depends_on:
      - node1

Run with:

docker-compose up -d

Monitoring

Prometheus Metrics

// Replication metrics available via Prometheus
// GET /metrics on metricsPort

# Replication queue size per node
topgun_replication_queue_size{node="node-2"} 15

# Pending synchronous acknowledgments
topgun_replication_pending_acks 3

# Replication lag in milliseconds
topgun_replication_lag_ms{node="node-2"} 42

# Health status
topgun_replication_healthy 1
topgun_replication_unhealthy_nodes 0

Health Checks

The ReplicationPipeline.getHealth() method returns:

interface ReplicationHealth {
  healthy: boolean;          // Overall health status
  unhealthyNodes: string[];  // Nodes with issues
  laggyNodes: string[];      // Nodes with high lag
  avgLagMs: number;          // Average replication lag
}

Distributed Subscriptions

TopGun supports distributed live subscriptions for both queries and full-text search. When a client subscribes, the subscription is automatically registered across all cluster nodes, and updates flow efficiently to the client.

Subscription Protocol

The distributed subscription protocol uses four message types:

MessageDirectionPurpose
CLUSTER_SUB_REGISTERCoordinator → All NodesRegister subscription for evaluation
CLUSTER_SUB_ACKNode → CoordinatorAcknowledge registration + initial results
CLUSTER_SUB_UPDATENode → CoordinatorDelta update (ENTER/UPDATE/LEAVE)
CLUSTER_SUB_UNREGISTERCoordinator → All NodesRemove subscription

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                      DISTRIBUTED SUBSCRIPTION FLOW                       │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  1. SUBSCRIBE                                                            │
│  ────────────                                                            │
│  Client ──SEARCH_SUB──▶ Coordinator ──CLUSTER_SUB_REGISTER──▶ All Nodes │
│                                       CLUSTER_SUB_REGISTER               │
│                                                                          │
│  2. INITIAL RESULTS                                                      │
│  ─────────────────                                                       │
│  Coordinator ◀──CLUSTER_SUB_ACK── All Nodes (includes local results)    │
│       │                                                                  │
│       └──▶ Merge results (RRF for search, dedupe for query)             │
│       └──▶ Send to Client                                                │
│                                                                          │
│  3. LIVE UPDATES                                                         │
│  ───────────────                                                         │
│  Document changes on Node B:                                             │
│       │                                                                  │
│       ▼                                                                  │
│  Node B evaluates local subscriptions                                    │
│       │                                                                  │
│       ▼                                                                  │
│  Node B ──CLUSTER_SUB_UPDATE──▶ Coordinator only (NOT broadcast)        │
│       │                                                                  │
│       ▼                                                                  │
│  Coordinator forwards to Client                                          │
│                                                                          │
│  4. UNSUBSCRIBE                                                          │
│  ────────────                                                            │
│  Client ──UNSUB──▶ Coordinator ──CLUSTER_SUB_UNREGISTER──▶ All Nodes    │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Key Design Decisions

  1. Coordinator Pattern: The node receiving the client subscription becomes the “coordinator” - it aggregates results and forwards updates to the client
  2. Targeted Updates: Nodes send updates ONLY to coordinators with matching subscriptions, not broadcast to all nodes
  3. Local Evaluation: Each node evaluates subscriptions against its local data/index
  4. Unified Protocol: Same message structure for FTS and Query (different evaluation logic)

Node Disconnect Handling

When a cluster node disconnects:

  • Its results are automatically removed from active subscriptions
  • Pending ACKs for the disconnected node are resolved
  • Local subscriptions where the disconnected node was coordinator are cleaned up
  • When the node rejoins, subscriptions can be re-registered

Metrics

Distributed subscription metrics are available via Prometheus:

# Active subscriptions by type (SEARCH/QUERY)
topgun_distributed_sub_active{type="SEARCH"} 15
topgun_distributed_sub_active{type="QUERY"} 8

# Subscription registrations
topgun_distributed_sub_total{type="SEARCH",status="success"} 100
topgun_distributed_sub_total{type="QUERY",status="timeout"} 2

# Update delivery
topgun_distributed_sub_updates{direction="sent",change_type="ENTER"} 5000
topgun_distributed_sub_updates{direction="received",change_type="UPDATE"} 4800

# Latency histogram
topgun_distributed_sub_update_latency_ms{type="SEARCH",quantile="0.99"} 12

See Also: Live Queries and Full-Text Search guides for client-side usage of distributed subscriptions.

Best Practices

1

Use Odd Node Counts

For QUORUM consistency, use 3, 5, or 7 nodes. This ensures clear majority without split-brain.

2

Match Consistency to Use Case

Use EVENTUAL for real-time features, QUORUM for most data, STRONG only for critical operations.

3

Monitor Replication Lag

Set up alerts on topgun_replication_lag_ms. High lag may indicate network issues or overloaded nodes.

4

Enable Cluster TLS

In production, always use TLS for inter-node communication with clusterTls config.