Event Journal
The Event Journal captures all map changes as an append-only log, enabling CDC (Change Data Capture), audit trails, event replay, and real-time activity feeds.
Durable Storage
Events persisted to PostgreSQL with configurable retention
Real-time Streaming
Subscribe to live changes via WebSocket
Time Travel
Query historical state and replay events
Use Cases
- Audit Trail: Regulatory compliance, who changed what and when
- CDC (Change Data Capture): Stream changes to Kafka, Elasticsearch, analytics
- Event Replay: Rebuild state from events, debug production issues
- Activity Feeds: Real-time notifications based on data changes
- Debugging: Time-travel to understand data evolution
Server Configuration
Enable the Event Journal on your server:
import { ServerCoordinator } from '@topgunbuild/server';
import { Pool } from 'pg';
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
});
const server = new ServerCoordinator({
port: 8080,
storage: new PostgresAdapter({ pool }),
// Enable Event Journal
eventJournalEnabled: true,
eventJournalConfig: {
capacity: 10000, // Max events in memory
ttlMs: 0, // Time-to-live (0 = infinite)
persistent: true, // Persist to PostgreSQL
persistBatchSize: 100, // Batch size for DB writes
persistIntervalMs: 1000, // Flush interval
includeMaps: [], // Filter: only these maps (empty = all)
excludeMaps: ['_internal'], // Filter: exclude these maps
},
});
await server.start(); Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
capacity | number | 10000 | Maximum events to keep in memory |
ttlMs | number | 0 | Time-to-live for in-memory events (0 = infinite) |
persistent | boolean | true | Persist events to PostgreSQL |
persistBatchSize | number | 100 | Batch size for database writes |
persistIntervalMs | number | 1000 | Flush interval for pending writes |
includeMaps | string[] | [] | Only capture these maps (empty = all) |
excludeMaps | string[] | [] | Exclude these maps from capture |
Client Subscription
Subscribe to journal events from the client:
import { TopGunClient } from '@topgunbuild/client';
const client = new TopGunClient({
serverUrl: 'ws://localhost:8080',
});
// Get journal reader
const journal = client.getEventJournal();
// Subscribe to all events
const unsubscribe = journal.subscribe((event) => {
console.log(`[${event.type}] ${event.mapName}:${event.key}`);
console.log(' Value:', event.value);
console.log(' Previous:', event.previousValue);
console.log(' Sequence:', event.sequence.toString());
}, {
fromSequence: 0n, // Start from beginning (optional)
mapName: 'orders', // Filter by map (optional)
types: ['PUT'], // Filter by type (optional)
});
// Later: unsubscribe
unsubscribe(); JournalEvent Structure
interface JournalEvent {
sequence: bigint; // Monotonically increasing ID
type: 'PUT' | 'UPDATE' | 'DELETE';
mapName: string; // Name of the map
key: string; // Key that changed
value?: unknown; // New value (undefined for DELETE)
previousValue?: unknown; // Previous value (for UPDATE/DELETE)
timestamp: Timestamp; // HLC timestamp
nodeId: string; // Node that made the change
metadata?: Record<string, unknown>;
}
Reading History
Read historical events from the journal:
// Read historical events
const journal = client.getEventJournal();
// Get latest sequence
const latestSeq = await journal.getLatestSequence();
console.log('Latest sequence:', latestSeq.toString());
// Read last 100 events
const events = await journal.readFrom(latestSeq - 100n, 100);
for (const event of events) {
console.log(`#${event.sequence}: ${event.type} ${event.mapName}:${event.key}`);
}
// Read events for specific map
const orderEvents = await journal.readMapEvents('orders', 0n, 50); React Hook
The useEventJournal hook provides a reactive interface:
import { useEventJournal } from '@topgunbuild/react';
function ActivityFeed() {
const { events, lastEvent, isSubscribed, clearEvents } = useEventJournal({
mapName: 'orders',
types: ['PUT', 'UPDATE'],
maxEvents: 50,
onEvent: (event) => {
// Show notification for new orders
if (event.type === 'PUT') {
showToast(`New order: ${event.key}`);
}
},
});
return (
<div>
<div className="flex justify-between">
<h2>Activity {isSubscribed && '🟢'}</h2>
<button onClick={clearEvents}>Clear</button>
</div>
<ul>
{events.map((e) => (
<li key={e.sequence.toString()}>
<span className="badge">{e.type}</span>
<span>{e.mapName}:{e.key}</span>
<time>{new Date(e.timestamp.millis).toLocaleTimeString()}</time>
</li>
))}
</ul>
</div>
);
} Hook Options
| Option | Type | Description |
|---|---|---|
mapName | string | Filter events by map name |
types | ('PUT' | 'UPDATE' | 'DELETE')[] | Filter by event types |
fromSequence | bigint | Start receiving from this sequence |
maxEvents | number | Max events to keep in state (default: 100) |
onEvent | (event: JournalEvent) => void | Callback for each new event |
paused | boolean | Pause subscription |
Hook Return Values
| Property | Type | Description |
|---|---|---|
events | JournalEvent[] | Array of events (newest last) |
lastEvent | JournalEvent | null | Most recent event |
isSubscribed | boolean | Whether subscription is active |
clearEvents | () => void | Clear accumulated events |
readFrom | (seq, limit?) => Promise<JournalEvent[]> | Read historical events |
getLatestSequence | () => Promise<bigint> | Get latest sequence number |
Audit Log Example
Build a complete audit log with history pagination:
import { useEventJournal } from '@topgunbuild/react';
function AuditLog({ mapName }) {
const { events, readFrom, getLatestSequence } = useEventJournal({
mapName,
maxEvents: 100,
});
const [history, setHistory] = useState([]);
const [loading, setLoading] = useState(false);
const loadMore = async () => {
setLoading(true);
const oldestSeq = history.length > 0
? history[0].sequence - 1n
: await getLatestSequence();
const older = await readFrom(oldestSeq - 50n, 50);
setHistory([...older, ...history]);
setLoading(false);
};
return (
<div>
<h2>Audit Log: {mapName}</h2>
<button onClick={loadMore} disabled={loading}>
{loading ? 'Loading...' : 'Load More'}
</button>
<table>
<thead>
<tr>
<th>Time</th>
<th>Action</th>
<th>Key</th>
<th>Changes</th>
</tr>
</thead>
<tbody>
{[...history, ...events].map((e) => (
<tr key={e.sequence.toString()}>
<td>{new Date(e.timestamp.millis).toISOString()}</td>
<td>{e.type}</td>
<td>{e.key}</td>
<td>
{e.type === 'DELETE'
? `Deleted: ${JSON.stringify(e.previousValue)}`
: JSON.stringify(e.value)}
</td>
</tr>
))}
</tbody>
</table>
</div>
);
} CDC Export
Export changes to external systems like Kafka:
// Server-side: Export to Kafka
import { Kafka } from 'kafkajs';
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
await producer.connect();
// Subscribe to journal and forward to Kafka
server.eventJournalService.subscribe(async (event) => {
await producer.send({
topic: `topgun.${event.mapName}`,
messages: [{
key: event.key,
value: JSON.stringify({
type: event.type,
value: event.value,
previousValue: event.previousValue,
timestamp: event.timestamp,
}),
headers: {
'sequence': event.sequence.toString(),
'nodeId': event.nodeId,
},
}],
});
});
// Or use NDJSON stream export
const stream = server.eventJournalService.exportStream({
fromSequence: 0n,
mapName: 'orders',
types: ['PUT', 'UPDATE'],
});
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value); // NDJSON line
} ML Pipeline Integration
Stream events to ML services for async processing (sentiment analysis, content moderation, embeddings):
// Server-side: Send events to ML pipeline for real-time processing
server.eventJournalService.subscribe(async (event) => {
// Only process user-generated content
if (event.mapName !== 'posts' && event.mapName !== 'comments') return;
if (event.type === 'DELETE') return;
const content = event.value?.content;
if (!content) return;
// Send to ML service for async processing
await fetch('https://ml-pipeline.example.com/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
eventId: event.sequence.toString(),
mapName: event.mapName,
key: event.key,
content,
tasks: ['sentiment', 'toxicity', 'embedding'],
}),
});
}, { mapName: 'posts' });
// Alternative: Batch processing with windowing
const eventBuffer: JournalEvent[] = [];
server.eventJournalService.subscribe((event) => {
eventBuffer.push(event);
}, { types: ['PUT', 'UPDATE'] });
// Flush batch every 5 seconds
setInterval(async () => {
if (eventBuffer.length === 0) return;
const batch = eventBuffer.splice(0, 100);
await fetch('https://ml-pipeline.example.com/batch', {
method: 'POST',
body: JSON.stringify({ events: batch }),
});
}, 5000); Real-time vs Async: For synchronous ML processing (blocking writes until ML completes), use Interceptors instead. Event Journal is better for async pipelines where you don’t need to block the write operation.
Retention & Cleanup
Manage storage with retention policies:
// Server-side: Cleanup old events
// Run this periodically (e.g., daily cron job)
// Clean events older than 30 days
const deletedCount = await server.eventJournalService.cleanupOldEvents(30);
console.log(`Cleaned up ${deletedCount} old journal events`);
// Or query with date filters
const recentEvents = await server.eventJournalService.queryFromStorage({
mapName: 'orders',
fromDate: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000), // Last 7 days
limit: 1000,
}); Server-side Query Options
| Option | Type | Description |
|---|---|---|
mapName | string | Filter by map name |
key | string | Filter by specific key |
types | string[] | Filter by event types |
fromSequence | bigint | Start sequence (inclusive) |
toSequence | bigint | End sequence (inclusive) |
fromDate | Date | Start date filter |
toDate | Date | End date filter |
limit | number | Maximum results (default: 100) |
offset | number | Pagination offset (Note: for real-time queries use cursor-based pagination via fromSequence) |
Database Schema
The Event Journal creates the following PostgreSQL table:
CREATE TABLE event_journal (
sequence BIGINT PRIMARY KEY,
type VARCHAR(10) NOT NULL CHECK (type IN ('PUT', 'UPDATE', 'DELETE')),
map_name VARCHAR(255) NOT NULL,
key VARCHAR(1024) NOT NULL,
value JSONB,
previous_value JSONB,
timestamp JSONB NOT NULL,
node_id VARCHAR(64) NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for common queries
CREATE INDEX idx_journal_map_name ON event_journal(map_name);
CREATE INDEX idx_journal_key ON event_journal(map_name, key);
CREATE INDEX idx_journal_created_at ON event_journal(created_at);
CREATE INDEX idx_journal_node_id ON event_journal(node_id);
Performance Tip: For high-volume deployments, consider time-based partitioning of the event_journal table and schedule regular cleanup using cleanupOldEvents().
Performance Considerations
| Aspect | Recommendation |
|---|---|
| Memory | Default 10,000 events in memory (~1-10MB depending on value size) |
| Batching | Events are batched before writing to PostgreSQL (default: 100 events) |
| Indexing | Indexes on map_name, key, created_at for efficient queries |
| Partitioning | Consider time-based partitioning for tables with millions of events |
| Cleanup | Schedule periodic cleanup with retention policy |