DocsGuidesEvent Journal

Event Journal

The Event Journal captures all map changes as an append-only log, enabling CDC (Change Data Capture), audit trails, event replay, and real-time activity feeds.

Durable Storage

Events persisted to PostgreSQL with configurable retention

Real-time Streaming

Subscribe to live changes via WebSocket

Time Travel

Query historical state and replay events


Use Cases

  • Audit Trail: Regulatory compliance, who changed what and when
  • CDC (Change Data Capture): Stream changes to Kafka, Elasticsearch, analytics
  • Event Replay: Rebuild state from events, debug production issues
  • Activity Feeds: Real-time notifications based on data changes
  • Debugging: Time-travel to understand data evolution

Server Configuration

Enable the Event Journal on your server:

server.ts
import { ServerCoordinator } from '@topgunbuild/server';
import { Pool } from 'pg';

const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
});

const server = new ServerCoordinator({
  port: 8080,
  storage: new PostgresAdapter({ pool }),

  // Enable Event Journal
  eventJournalEnabled: true,
  eventJournalConfig: {
    capacity: 10000,        // Max events in memory
    ttlMs: 0,               // Time-to-live (0 = infinite)
    persistent: true,       // Persist to PostgreSQL
    persistBatchSize: 100,  // Batch size for DB writes
    persistIntervalMs: 1000, // Flush interval
    includeMaps: [],        // Filter: only these maps (empty = all)
    excludeMaps: ['_internal'], // Filter: exclude these maps
  },
});

await server.start();

Configuration Options

OptionTypeDefaultDescription
capacitynumber10000Maximum events to keep in memory
ttlMsnumber0Time-to-live for in-memory events (0 = infinite)
persistentbooleantruePersist events to PostgreSQL
persistBatchSizenumber100Batch size for database writes
persistIntervalMsnumber1000Flush interval for pending writes
includeMapsstring[][]Only capture these maps (empty = all)
excludeMapsstring[][]Exclude these maps from capture

Client Subscription

Subscribe to journal events from the client:

Subscribe to Events
import { TopGunClient } from '@topgunbuild/client';

const client = new TopGunClient({
  serverUrl: 'ws://localhost:8080',
});

// Get journal reader
const journal = client.getEventJournal();

// Subscribe to all events
const unsubscribe = journal.subscribe((event) => {
  console.log(`[${event.type}] ${event.mapName}:${event.key}`);
  console.log('  Value:', event.value);
  console.log('  Previous:', event.previousValue);
  console.log('  Sequence:', event.sequence.toString());
}, {
  fromSequence: 0n,  // Start from beginning (optional)
  mapName: 'orders', // Filter by map (optional)
  types: ['PUT'],    // Filter by type (optional)
});

// Later: unsubscribe
unsubscribe();

JournalEvent Structure

interface JournalEvent {
  sequence: bigint;           // Monotonically increasing ID
  type: 'PUT' | 'UPDATE' | 'DELETE';
  mapName: string;            // Name of the map
  key: string;                // Key that changed
  value?: unknown;            // New value (undefined for DELETE)
  previousValue?: unknown;    // Previous value (for UPDATE/DELETE)
  timestamp: Timestamp;       // HLC timestamp
  nodeId: string;             // Node that made the change
  metadata?: Record<string, unknown>;
}

Reading History

Read historical events from the journal:

Read Historical Events
// Read historical events
const journal = client.getEventJournal();

// Get latest sequence
const latestSeq = await journal.getLatestSequence();
console.log('Latest sequence:', latestSeq.toString());

// Read last 100 events
const events = await journal.readFrom(latestSeq - 100n, 100);

for (const event of events) {
  console.log(`#${event.sequence}: ${event.type} ${event.mapName}:${event.key}`);
}

// Read events for specific map
const orderEvents = await journal.readMapEvents('orders', 0n, 50);

React Hook

The useEventJournal hook provides a reactive interface:

components/ActivityFeed.tsx
import { useEventJournal } from '@topgunbuild/react';

function ActivityFeed() {
  const { events, lastEvent, isSubscribed, clearEvents } = useEventJournal({
    mapName: 'orders',
    types: ['PUT', 'UPDATE'],
    maxEvents: 50,
    onEvent: (event) => {
      // Show notification for new orders
      if (event.type === 'PUT') {
        showToast(`New order: ${event.key}`);
      }
    },
  });

  return (
    <div>
      <div className="flex justify-between">
        <h2>Activity {isSubscribed && '🟢'}</h2>
        <button onClick={clearEvents}>Clear</button>
      </div>
      <ul>
        {events.map((e) => (
          <li key={e.sequence.toString()}>
            <span className="badge">{e.type}</span>
            <span>{e.mapName}:{e.key}</span>
            <time>{new Date(e.timestamp.millis).toLocaleTimeString()}</time>
          </li>
        ))}
      </ul>
    </div>
  );
}

Hook Options

OptionTypeDescription
mapNamestringFilter events by map name
types('PUT' | 'UPDATE' | 'DELETE')[]Filter by event types
fromSequencebigintStart receiving from this sequence
maxEventsnumberMax events to keep in state (default: 100)
onEvent(event: JournalEvent) => voidCallback for each new event
pausedbooleanPause subscription

Hook Return Values

PropertyTypeDescription
eventsJournalEvent[]Array of events (newest last)
lastEventJournalEvent | nullMost recent event
isSubscribedbooleanWhether subscription is active
clearEvents() => voidClear accumulated events
readFrom(seq, limit?) => Promise<JournalEvent[]>Read historical events
getLatestSequence() => Promise<bigint>Get latest sequence number

Audit Log Example

Build a complete audit log with history pagination:

components/AuditLog.tsx
import { useEventJournal } from '@topgunbuild/react';

function AuditLog({ mapName }) {
  const { events, readFrom, getLatestSequence } = useEventJournal({
    mapName,
    maxEvents: 100,
  });

  const [history, setHistory] = useState([]);
  const [loading, setLoading] = useState(false);

  const loadMore = async () => {
    setLoading(true);
    const oldestSeq = history.length > 0
      ? history[0].sequence - 1n
      : await getLatestSequence();

    const older = await readFrom(oldestSeq - 50n, 50);
    setHistory([...older, ...history]);
    setLoading(false);
  };

  return (
    <div>
      <h2>Audit Log: {mapName}</h2>
      <button onClick={loadMore} disabled={loading}>
        {loading ? 'Loading...' : 'Load More'}
      </button>
      <table>
        <thead>
          <tr>
            <th>Time</th>
            <th>Action</th>
            <th>Key</th>
            <th>Changes</th>
          </tr>
        </thead>
        <tbody>
          {[...history, ...events].map((e) => (
            <tr key={e.sequence.toString()}>
              <td>{new Date(e.timestamp.millis).toISOString()}</td>
              <td>{e.type}</td>
              <td>{e.key}</td>
              <td>
                {e.type === 'DELETE'
                  ? `Deleted: ${JSON.stringify(e.previousValue)}`
                  : JSON.stringify(e.value)}
              </td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
}

CDC Export

Export changes to external systems like Kafka:

CDC to Kafka
// Server-side: Export to Kafka
import { Kafka } from 'kafkajs';

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
await producer.connect();

// Subscribe to journal and forward to Kafka
server.eventJournalService.subscribe(async (event) => {
  await producer.send({
    topic: `topgun.${event.mapName}`,
    messages: [{
      key: event.key,
      value: JSON.stringify({
        type: event.type,
        value: event.value,
        previousValue: event.previousValue,
        timestamp: event.timestamp,
      }),
      headers: {
        'sequence': event.sequence.toString(),
        'nodeId': event.nodeId,
      },
    }],
  });
});

// Or use NDJSON stream export
const stream = server.eventJournalService.exportStream({
  fromSequence: 0n,
  mapName: 'orders',
  types: ['PUT', 'UPDATE'],
});

const reader = stream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value); // NDJSON line
}

ML Pipeline Integration

Stream events to ML services for async processing (sentiment analysis, content moderation, embeddings):

ML Pipeline Consumer
// Server-side: Send events to ML pipeline for real-time processing
server.eventJournalService.subscribe(async (event) => {
  // Only process user-generated content
  if (event.mapName !== 'posts' && event.mapName !== 'comments') return;
  if (event.type === 'DELETE') return;

  const content = event.value?.content;
  if (!content) return;

  // Send to ML service for async processing
  await fetch('https://ml-pipeline.example.com/enqueue', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      eventId: event.sequence.toString(),
      mapName: event.mapName,
      key: event.key,
      content,
      tasks: ['sentiment', 'toxicity', 'embedding'],
    }),
  });
}, { mapName: 'posts' });

// Alternative: Batch processing with windowing
const eventBuffer: JournalEvent[] = [];

server.eventJournalService.subscribe((event) => {
  eventBuffer.push(event);
}, { types: ['PUT', 'UPDATE'] });

// Flush batch every 5 seconds
setInterval(async () => {
  if (eventBuffer.length === 0) return;

  const batch = eventBuffer.splice(0, 100);
  await fetch('https://ml-pipeline.example.com/batch', {
    method: 'POST',
    body: JSON.stringify({ events: batch }),
  });
}, 5000);

Real-time vs Async: For synchronous ML processing (blocking writes until ML completes), use Interceptors instead. Event Journal is better for async pipelines where you don’t need to block the write operation.


Retention & Cleanup

Manage storage with retention policies:

Retention Policy
// Server-side: Cleanup old events
// Run this periodically (e.g., daily cron job)

// Clean events older than 30 days
const deletedCount = await server.eventJournalService.cleanupOldEvents(30);
console.log(`Cleaned up ${deletedCount} old journal events`);

// Or query with date filters
const recentEvents = await server.eventJournalService.queryFromStorage({
  mapName: 'orders',
  fromDate: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), // Last 7 days
  limit: 1000,
});

Server-side Query Options

OptionTypeDescription
mapNamestringFilter by map name
keystringFilter by specific key
typesstring[]Filter by event types
fromSequencebigintStart sequence (inclusive)
toSequencebigintEnd sequence (inclusive)
fromDateDateStart date filter
toDateDateEnd date filter
limitnumberMaximum results (default: 100)
offsetnumberPagination offset (Note: for real-time queries use cursor-based pagination via fromSequence)

Database Schema

The Event Journal creates the following PostgreSQL table:

CREATE TABLE event_journal (
  sequence BIGINT PRIMARY KEY,
  type VARCHAR(10) NOT NULL CHECK (type IN ('PUT', 'UPDATE', 'DELETE')),
  map_name VARCHAR(255) NOT NULL,
  key VARCHAR(1024) NOT NULL,
  value JSONB,
  previous_value JSONB,
  timestamp JSONB NOT NULL,
  node_id VARCHAR(64) NOT NULL,
  metadata JSONB,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes for common queries
CREATE INDEX idx_journal_map_name ON event_journal(map_name);
CREATE INDEX idx_journal_key ON event_journal(map_name, key);
CREATE INDEX idx_journal_created_at ON event_journal(created_at);
CREATE INDEX idx_journal_node_id ON event_journal(node_id);

Performance Tip: For high-volume deployments, consider time-based partitioning of the event_journal table and schedule regular cleanup using cleanupOldEvents().


Performance Considerations

AspectRecommendation
MemoryDefault 10,000 events in memory (~1-10MB depending on value size)
BatchingEvents are batched before writing to PostgreSQL (default: 100 events)
IndexingIndexes on map_name, key, created_at for efficient queries
PartitioningConsider time-based partitioning for tables with millions of events
CleanupSchedule periodic cleanup with retention policy