DocsGuidesInterceptors

Interceptors

Interceptors provide server-side hooks to intercept, validate, enrich, or reject operations before and after they’re applied. Unlike Entry Processors which run in a sandbox, interceptors have full access to Node.js APIs, enabling integration with external services, ML models, and third-party APIs.

ML Integration

Call external ML APIs for moderation, fraud detection, recommendations

Validation & Enrichment

Validate data, add server timestamps, enrich with external data

Webhooks & Events

Trigger webhooks, send to analytics, notify external systems


Basic Usage

server.ts
import { ServerCoordinator, IInterceptor, ServerOp, OpContext } from '@topgunbuild/server';

// Custom interceptor for logging all operations
const loggingInterceptor: IInterceptor = {
  name: 'logging',

  async onBeforeOp(op: ServerOp, context: OpContext) {
    console.log(`[${context.clientId}] ${op.opType} ${op.mapName}:${op.key}`);
    return op; // Pass through unchanged
  },

  async onAfterOp(op: ServerOp, context: OpContext) {
    console.log(`[${context.clientId}] Completed: ${op.mapName}:${op.key}`);
  },
};

const server = new ServerCoordinator({
  port: 8080,
  interceptors: [loggingInterceptor],
});

Validation & Enrichment

Validate incoming data and enrich it with server-side information:

Validation Interceptor
// Validate and enrich data before it's stored
const validationInterceptor: IInterceptor = {
  name: 'validation',

  async onBeforeOp(op: ServerOp, context: OpContext) {
    // Only process PUT operations on 'users' map
    if (op.opType !== 'PUT' || op.mapName !== 'users') {
      return op;
    }

    const value = op.record?.value;

    // Validate email format
    if (value?.email && !isValidEmail(value.email)) {
      throw new Error('Invalid email format');
    }

    // Enrich with server timestamp
    if (op.record) {
      op.record.value = {
        ...value,
        updatedAt: Date.now(),
        updatedBy: context.principal?.userId,
      };
    }

    return op;
  },
};

ML/AI Integration

Content Moderation

Call external ML services to moderate user-generated content:

ML Content Moderation
// ML-powered content moderation
const moderationInterceptor: IInterceptor = {
  name: 'content-moderation',

  async onBeforeOp(op: ServerOp, context: OpContext) {
    // Only moderate posts and comments
    if (!['posts', 'comments'].includes(op.mapName)) {
      return op;
    }

    if (op.opType !== 'PUT' || !op.record?.value?.content) {
      return op;
    }

    // Call external ML service for toxicity detection
    const response = await fetch('https://ml-api.example.com/moderate', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ text: op.record.value.content }),
    });

    const { toxicityScore, categories } = await response.json();

    // Reject highly toxic content
    if (toxicityScore > 0.9) {
      throw new Error('Content violates community guidelines');
    }

    // Enrich with moderation metadata
    op.record.value = {
      ...op.record.value,
      moderation: {
        score: toxicityScore,
        categories,
        reviewedAt: Date.now(),
      },
    };

    // Flag borderline content for human review
    if (toxicityScore > 0.7) {
      op.record.value.moderation.flaggedForReview = true;
    }

    return op;
  },
};

Fraud Detection

Real-time fraud scoring for financial transactions:

Fraud Detection
// Real-time fraud detection for transactions
const fraudInterceptor: IInterceptor = {
  name: 'fraud-detection',

  async onBeforeOp(op: ServerOp, context: OpContext) {
    if (op.mapName !== 'transactions' || op.opType !== 'PUT') {
      return op;
    }

    const transaction = op.record?.value;

    // Call fraud detection ML service
    const riskAssessment = await fetch('https://fraud-api.example.com/assess', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${process.env.FRAUD_API_KEY}`,
      },
      body: JSON.stringify({
        amount: transaction.amount,
        userId: context.principal?.userId,
        merchantId: transaction.merchantId,
        timestamp: Date.now(),
      }),
    }).then(r => r.json());

    // Block high-risk transactions
    if (riskAssessment.riskScore > 0.95) {
      throw new Error('Transaction blocked: suspicious activity detected');
    }

    // Enrich transaction with risk data
    if (op.record) {
      op.record.value = {
        ...transaction,
        riskScore: riskAssessment.riskScore,
        riskFactors: riskAssessment.factors,
      };
    }

    return op;
  },

  async onAfterOp(op: ServerOp, context: OpContext) {
    // Send to analytics pipeline after successful write
    if (op.mapName === 'transactions') {
      await fetch('https://analytics.example.com/events', {
        method: 'POST',
        body: JSON.stringify({
          event: 'transaction_completed',
          data: op.record?.value,
          userId: context.principal?.userId,
        }),
      });
    }
  },
};

Performance Tip: For high-throughput scenarios, consider caching ML responses, using connection pooling for external APIs, or offloading non-critical enrichment to onAfterOp to avoid blocking writes.


Webhooks

Fire webhooks to external systems when data changes:

Webhook Interceptor
// Send webhooks on data changes
const webhookInterceptor: IInterceptor = {
  name: 'webhooks',

  async onAfterOp(op: ServerOp, context: OpContext) {
    // Get registered webhooks for this map
    const webhooks = await getWebhooksForMap(op.mapName);

    // Fire webhooks in parallel (don't block the response)
    Promise.allSettled(
      webhooks.map(webhook =>
        fetch(webhook.url, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            'X-TopGun-Signature': signPayload(webhook.secret, op),
          },
          body: JSON.stringify({
            event: op.opType,
            mapName: op.mapName,
            key: op.key,
            value: op.record?.value,
            timestamp: Date.now(),
          }),
        })
      )
    ).then(results => {
      // Log failed webhooks
      results.forEach((result, i) => {
        if (result.status === 'rejected') {
          console.error(`Webhook failed: ${webhooks[i].url}`, result.reason);
        }
      });
    });
  },
};

Built-in Interceptors

TopGun includes ready-to-use interceptors:

Rate Limiting
import { RateLimitInterceptor } from '@topgunbuild/server';

// Built-in rate limiting interceptor
const rateLimiter = new RateLimitInterceptor({
  maxRequestsPerMinute: 100,
  maxRequestsPerSecond: 10,
});

const server = new ServerCoordinator({
  port: 8080,
  interceptors: [rateLimiter],
});
InterceptorPurpose
RateLimitInterceptorLimit requests per client per time window
TimestampInterceptorAdd server timestamps to all records

Interceptor Chain

Interceptors execute in order. Use this for building processing pipelines:

Interceptor Chain
// Interceptors execute in order - chain them for complex pipelines
const server = new ServerCoordinator({
  port: 8080,
  interceptors: [
    rateLimiter,          // 1. Rate limiting first
    authEnricher,         // 2. Add auth context
    validationInterceptor, // 3. Validate data
    moderationInterceptor, // 4. ML moderation
    loggingInterceptor,   // 5. Log everything
    webhookInterceptor,   // 6. Fire webhooks (onAfterOp)
  ],
});

Execution Order

  1. onBeforeOp runs for each interceptor in array order
  2. If any interceptor throws or returns null, the chain stops
  3. Operation is applied to storage
  4. onAfterOp runs for each interceptor in array order

API Reference

IInterceptor Interface
interface IInterceptor {
  /** Name for logging and debugging */
  name: string;

  /** Called when a client connects. Throw to reject connection. */
  onConnection?(context: ConnectionContext): Promise<void>;

  /** Called when a client disconnects. */
  onDisconnect?(context: ConnectionContext): Promise<void>;

  /**
   * Called before an operation is applied.
   * - Return op (modified or not) to continue
   * - Return null to silently drop the operation
   * - Throw an error to reject and notify the client
   */
  onBeforeOp?(op: ServerOp, context: OpContext): Promise<ServerOp | null>;

  /** Called after an operation is successfully applied. */
  onAfterOp?(op: ServerOp, context: OpContext): Promise<void>;
}

interface ServerOp {
  mapName: string;
  key: string;
  opType: 'PUT' | 'REMOVE' | 'OR_ADD' | 'OR_REMOVE';
  record?: LWWRecord<any>;
  orRecord?: ORMapRecord<any>;
}

interface OpContext {
  clientId: string;
  principal?: Principal;  // Auth info (userId, roles)
  isAuthenticated: boolean;
  fromCluster: boolean;   // True if op came from another node
}

Return Values for onBeforeOp

ReturnBehavior
op (original or modified)Continue to next interceptor
nullSilently drop operation (no error, no storage)
throw ErrorReject operation, send error to client

Use Cases

ScenarioInterceptor HookExample
Content moderationonBeforeOpCall ML API, reject toxic content
Fraud detectiononBeforeOpScore transactions, block high-risk
Data enrichmentonBeforeOpAdd geolocation, currency conversion
WebhooksonAfterOpNotify external systems of changes
AnalyticsonAfterOpSend events to analytics pipeline
Audit loggingonAfterOpLog all changes to external system
Rate limitingonBeforeOpThrottle requests per client
Connection trackingonConnection/onDisconnectTrack active users

Interceptors vs Entry Processors

AspectInterceptorsEntry Processors
ExecutionServer-side, full Node.js accessSandboxed isolate
Network accessYes - call external APIsNo - isolated
Use caseValidation, ML, webhooksAtomic read-modify-write
Latency impactCan add latency if calling external APIsFast, in-process
SecurityFull trust (server code)Untrusted client code

When to use which: Use Entry Processors for atomic counters, conditional updates, and logic that doesn’t need external data. Use Interceptors when you need to call external services, validate against external data sources, or trigger side effects.