Interceptors
Interceptors provide server-side hooks to intercept, validate, enrich, or reject operations before and after they’re applied. Unlike Entry Processors which run in a sandbox, interceptors have full access to Node.js APIs, enabling integration with external services, ML models, and third-party APIs.
ML Integration
Call external ML APIs for moderation, fraud detection, recommendations
Validation & Enrichment
Validate data, add server timestamps, enrich with external data
Webhooks & Events
Trigger webhooks, send to analytics, notify external systems
Basic Usage
import { ServerCoordinator, IInterceptor, ServerOp, OpContext } from '@topgunbuild/server';
// Custom interceptor for logging all operations
const loggingInterceptor: IInterceptor = {
name: 'logging',
async onBeforeOp(op: ServerOp, context: OpContext) {
console.log(`[${context.clientId}] ${op.opType} ${op.mapName}:${op.key}`);
return op; // Pass through unchanged
},
async onAfterOp(op: ServerOp, context: OpContext) {
console.log(`[${context.clientId}] Completed: ${op.mapName}:${op.key}`);
},
};
const server = new ServerCoordinator({
port: 8080,
interceptors: [loggingInterceptor],
}); Validation & Enrichment
Validate incoming data and enrich it with server-side information:
// Validate and enrich data before it's stored
const validationInterceptor: IInterceptor = {
name: 'validation',
async onBeforeOp(op: ServerOp, context: OpContext) {
// Only process PUT operations on 'users' map
if (op.opType !== 'PUT' || op.mapName !== 'users') {
return op;
}
const value = op.record?.value;
// Validate email format
if (value?.email && !isValidEmail(value.email)) {
throw new Error('Invalid email format');
}
// Enrich with server timestamp
if (op.record) {
op.record.value = {
...value,
updatedAt: Date.now(),
updatedBy: context.principal?.userId,
};
}
return op;
},
}; ML/AI Integration
Content Moderation
Call external ML services to moderate user-generated content:
// ML-powered content moderation
const moderationInterceptor: IInterceptor = {
name: 'content-moderation',
async onBeforeOp(op: ServerOp, context: OpContext) {
// Only moderate posts and comments
if (!['posts', 'comments'].includes(op.mapName)) {
return op;
}
if (op.opType !== 'PUT' || !op.record?.value?.content) {
return op;
}
// Call external ML service for toxicity detection
const response = await fetch('https://ml-api.example.com/moderate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ text: op.record.value.content }),
});
const { toxicityScore, categories } = await response.json();
// Reject highly toxic content
if (toxicityScore > 0.9) {
throw new Error('Content violates community guidelines');
}
// Enrich with moderation metadata
op.record.value = {
...op.record.value,
moderation: {
score: toxicityScore,
categories,
reviewedAt: Date.now(),
},
};
// Flag borderline content for human review
if (toxicityScore > 0.7) {
op.record.value.moderation.flaggedForReview = true;
}
return op;
},
}; Fraud Detection
Real-time fraud scoring for financial transactions:
// Real-time fraud detection for transactions
const fraudInterceptor: IInterceptor = {
name: 'fraud-detection',
async onBeforeOp(op: ServerOp, context: OpContext) {
if (op.mapName !== 'transactions' || op.opType !== 'PUT') {
return op;
}
const transaction = op.record?.value;
// Call fraud detection ML service
const riskAssessment = await fetch('https://fraud-api.example.com/assess', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.FRAUD_API_KEY}`,
},
body: JSON.stringify({
amount: transaction.amount,
userId: context.principal?.userId,
merchantId: transaction.merchantId,
timestamp: Date.now(),
}),
}).then(r => r.json());
// Block high-risk transactions
if (riskAssessment.riskScore > 0.95) {
throw new Error('Transaction blocked: suspicious activity detected');
}
// Enrich transaction with risk data
if (op.record) {
op.record.value = {
...transaction,
riskScore: riskAssessment.riskScore,
riskFactors: riskAssessment.factors,
};
}
return op;
},
async onAfterOp(op: ServerOp, context: OpContext) {
// Send to analytics pipeline after successful write
if (op.mapName === 'transactions') {
await fetch('https://analytics.example.com/events', {
method: 'POST',
body: JSON.stringify({
event: 'transaction_completed',
data: op.record?.value,
userId: context.principal?.userId,
}),
});
}
},
}; Performance Tip: For high-throughput scenarios, consider caching ML responses, using connection pooling for external APIs, or offloading non-critical enrichment to onAfterOp to avoid blocking writes.
Webhooks
Fire webhooks to external systems when data changes:
// Send webhooks on data changes
const webhookInterceptor: IInterceptor = {
name: 'webhooks',
async onAfterOp(op: ServerOp, context: OpContext) {
// Get registered webhooks for this map
const webhooks = await getWebhooksForMap(op.mapName);
// Fire webhooks in parallel (don't block the response)
Promise.allSettled(
webhooks.map(webhook =>
fetch(webhook.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-TopGun-Signature': signPayload(webhook.secret, op),
},
body: JSON.stringify({
event: op.opType,
mapName: op.mapName,
key: op.key,
value: op.record?.value,
timestamp: Date.now(),
}),
})
)
).then(results => {
// Log failed webhooks
results.forEach((result, i) => {
if (result.status === 'rejected') {
console.error(`Webhook failed: ${webhooks[i].url}`, result.reason);
}
});
});
},
}; Built-in Interceptors
TopGun includes ready-to-use interceptors:
import { RateLimitInterceptor } from '@topgunbuild/server';
// Built-in rate limiting interceptor
const rateLimiter = new RateLimitInterceptor({
maxRequestsPerMinute: 100,
maxRequestsPerSecond: 10,
});
const server = new ServerCoordinator({
port: 8080,
interceptors: [rateLimiter],
}); | Interceptor | Purpose |
|---|---|
RateLimitInterceptor | Limit requests per client per time window |
TimestampInterceptor | Add server timestamps to all records |
Interceptor Chain
Interceptors execute in order. Use this for building processing pipelines:
// Interceptors execute in order - chain them for complex pipelines
const server = new ServerCoordinator({
port: 8080,
interceptors: [
rateLimiter, // 1. Rate limiting first
authEnricher, // 2. Add auth context
validationInterceptor, // 3. Validate data
moderationInterceptor, // 4. ML moderation
loggingInterceptor, // 5. Log everything
webhookInterceptor, // 6. Fire webhooks (onAfterOp)
],
}); Execution Order
- onBeforeOp runs for each interceptor in array order
- If any interceptor throws or returns
null, the chain stops - Operation is applied to storage
- onAfterOp runs for each interceptor in array order
API Reference
interface IInterceptor {
/** Name for logging and debugging */
name: string;
/** Called when a client connects. Throw to reject connection. */
onConnection?(context: ConnectionContext): Promise<void>;
/** Called when a client disconnects. */
onDisconnect?(context: ConnectionContext): Promise<void>;
/**
* Called before an operation is applied.
* - Return op (modified or not) to continue
* - Return null to silently drop the operation
* - Throw an error to reject and notify the client
*/
onBeforeOp?(op: ServerOp, context: OpContext): Promise<ServerOp | null>;
/** Called after an operation is successfully applied. */
onAfterOp?(op: ServerOp, context: OpContext): Promise<void>;
}
interface ServerOp {
mapName: string;
key: string;
opType: 'PUT' | 'REMOVE' | 'OR_ADD' | 'OR_REMOVE';
record?: LWWRecord<any>;
orRecord?: ORMapRecord<any>;
}
interface OpContext {
clientId: string;
principal?: Principal; // Auth info (userId, roles)
isAuthenticated: boolean;
fromCluster: boolean; // True if op came from another node
} Return Values for onBeforeOp
| Return | Behavior |
|---|---|
op (original or modified) | Continue to next interceptor |
null | Silently drop operation (no error, no storage) |
throw Error | Reject operation, send error to client |
Use Cases
| Scenario | Interceptor Hook | Example |
|---|---|---|
| Content moderation | onBeforeOp | Call ML API, reject toxic content |
| Fraud detection | onBeforeOp | Score transactions, block high-risk |
| Data enrichment | onBeforeOp | Add geolocation, currency conversion |
| Webhooks | onAfterOp | Notify external systems of changes |
| Analytics | onAfterOp | Send events to analytics pipeline |
| Audit logging | onAfterOp | Log all changes to external system |
| Rate limiting | onBeforeOp | Throttle requests per client |
| Connection tracking | onConnection/onDisconnect | Track active users |
Interceptors vs Entry Processors
| Aspect | Interceptors | Entry Processors |
|---|---|---|
| Execution | Server-side, full Node.js access | Sandboxed isolate |
| Network access | Yes - call external APIs | No - isolated |
| Use case | Validation, ML, webhooks | Atomic read-modify-write |
| Latency impact | Can add latency if calling external APIs | Fast, in-process |
| Security | Full trust (server code) | Untrusted client code |
When to use which: Use Entry Processors for atomic counters, conditional updates, and logic that doesn’t need external data. Use Interceptors when you need to call external services, validate against external data sources, or trigger side effects.