watch 方法 API 文档

版本: v1.1.0
状态: ✅ 已实现


📑 目录


概述

watch() 方法直接返回 MongoDB Change Streams 的原生 ChangeStream<T> 对象,支持实时监听集合的数据变更。如需自动断点续传、多目标同步或缓存失效,请结合 ChangeStreamSyncManager 使用。


基本用法

import MonSQLize from 'monsqlize';
const db = new MonSQLize({
  type: 'mongodb',
  databaseName: 'mydb',
  config: { uri: 'mongodb://localhost:27017' }
});

await db.connect();
const collection = db.dbInstance.collection('users');

// 监听所有变更
const watcher = collection.watch();

watcher.on('change', (change) => {
  console.log('数据变更:', change.operationType);
  console.log('文档:', change.fullDocument);
});

API 参考

collection.watch([pipeline], [options])

监听集合的数据变更。

参数:

  • pipeline (Array, 可选): 聚合管道,用于过滤事件
  • options (Object, 可选): 配置选项

返回值: ChangeStream<TSchema> — MongoDB 驱动原生 ChangeStream 对象

⚠️ collection.watch() 直接返回 MongoDB 驱动的 ChangeStream,没有额外封装。 请参考 MongoDB ChangeStream 官方文档


配置选项

MongoDB 原生选项

选项类型默认值说明
fullDocumentstring'updateLookup'返回完整文档 ('default' | 'updateLookup' | 'whenAvailable' | 'required')
fullDocumentBeforeChangestring-返回修改前的文档 ('off' | 'whenAvailable' | 'required')
resumeAfterObject-从指定 resumeToken 继续
startAfterObject-从指定 resumeToken 开始(事务友好)
startAtOperationTimeTimestamp-从指定时间开始
maxAwaitTimeMSnumber-最大等待时间(毫秒)
batchSizenumber-批处理大小

ChangeStream 原生方法

watch() 返回的是 MongoDB 驱动的 ChangeStream<T> 对象,支持以下原生 API:

cs.on(event, handler)

监听事件(EventEmitter 接口)。

事件列表:

  • 'change': 数据变更
  • 'error': 错误
  • 'close': 关闭
  • 'end': 流结束

cs.once(event, handler)

监听事件(一次性)。

cs.close()

关闭 ChangeStream。

返回值: Promise<void>

cs.closed

只读属性,检查 ChangeStream 是否已关闭。

类型: boolean

cs.resumeToken

只读属性,获取最新 resumeToken(用于断点续传)。

类型: unknown

cs.next()

显式获取下一个变更事件(迭代器模式)。

返回值: Promise<ChangeStreamDocument<T>>

💡 如需断点续传、多目标同步或自动重连,请使用 ChangeStreamSyncManager


使用示例

示例 1: 基础监听

const watcher = collection.watch();

watcher.on('change', (change) => {
  console.log('操作类型:', change.operationType);
  console.log('文档ID:', change.documentKey._id);
});

示例 2: 过滤事件

// 只监听 insert 和 update
const watcher = collection.watch([
  { $match: { operationType: { $in: ['insert', 'update'] } } }
]);

watcher.on('change', (change) => {
  console.log('新增或修改:', change.fullDocument);
});

示例 3: 错误处理

const cs = collection.watch();

cs.on('error', (error) => {
  console.error('Change Stream 错误:', error);
});

cs.on('close', () => {
  console.log('Change Stream 已关闭');
});

示例 4: 统计监控(通过 ChangeStreamSyncManager)

如需内置统计(eventCount / syncedCount / errorCount),请使用 ChangeStreamSyncManager

import MonSQLize from 'monsqlize';

const syncManager = new MonSQLize.ChangeStreamSyncManager({ db, config: { ... } });
await syncManager.start();

setInterval(() => {
  const stats = syncManager.getStats();
  console.log('已同步事件:', stats.syncedCount, '错误:', stats.errorCount);
}, 60000);

示例 5: 优雅关闭

const cs = collection.watch();

process.on('SIGTERM', async () => {
  console.log('正在关闭 watch...');
  await cs.close();
  console.log('watch 已关闭');
  process.exit(0);
});

缓存失效集成

⚠️ collection.watch() 本身不提供内置的缓存失效功能。如需将 watch 与缓存集成,有两种方案:

方案一:手动处理(推荐轻量场景)

const cs = collection.watch();

cs.on('change', async (change) => {
  // 根据操作类型手动失效相应缓存键
  if (['insert', 'update', 'replace', 'delete'].includes(change.operationType)) {
    myCache.delete('user-list');
    myCache.delete(`user:${change.documentKey?._id}`);
  }
});

方案二:ChangeStreamSyncManager(推荐生产场景)

ChangeStreamSyncManager 内置了断点续传、多目标同步和统计能力,可在 apply 回调中处理缓存:

const syncManager = new MonSQLize.ChangeStreamSyncManager({
  db,
  config: {
    enabled: true,
    targets: [{
      name: 'cache-invalidation',
      apply: async (event) => {
        // 在 apply 中处理缓存失效
        myCache.delete('user-list');
      }
    }]
  }
});
await syncManager.start();

跨实例同步: 如需分布式缓存同步,请使用 DistributedCacheInvalidator,它支持通过 Pub/Sub 广播失效信号到其他实例。


注意事项

1. MongoDB 版本要求

Change Streams 需要 MongoDB 4.0+ 并且是副本集或分片集群环境。

单节点环境会报错:

Error: The $changeStream stage is only supported on replica sets

解决方案:

开发/测试环境 - 使用 mongodb-memory-server:

const msq = new MonSQLize({
  type: 'mongodb',
  databaseName: 'mydb',
  config: { 
    useMemoryServer: true,
    memoryServerOptions: {
      instance: {
        replSet: 'rs0'  // 启用副本集模式
      }
    }
  }
});

await msq.connect();
const collection = msq.dbInstance.collection('users');

// ✅ 现在可以使用 watch
const watcher = collection.watch();

生产环境 - 使用副本集或分片集群:

const msq = new MonSQLize({
  type: 'mongodb',
  databaseName: 'mydb',
  config: {
    uri: 'mongodb://host1:27017,host2:27017,host3:27017/mydb?replicaSet=rs0'
  }
});

2. 性能影响

  • watch 本身对性能影响很小(MongoDB 原生支持)
  • ChangeStream 监听是异步的,不阻塞主流程

3. resumeToken 过期

MongoDB oplog 有大小限制,resumeToken 可能过期(默认几小时)。

处理建议:

  • 监听 error 事件,检测 ChangeStreamHistoryLost 错误
  • 关闭当前 ChangeStream 并重新调用 collection.watch()(不带 resumeAfter
  • 如需自动处理断点续传,请使用 ChangeStreamSyncManager

4. 内存管理

长期运行的 watch 需要注意:

  • 正确调用 watcher.close() 释放资源
  • 监听 process 信号优雅关闭
  • 不要创建过多 watcher(每个集合 1-2 个即可)

故障排查

问题 1: watch 立即关闭

原因: MongoDB 不是副本集环境

解决: 使用副本集或 mongodb-memory-server

问题 2: ChangeStream 意外关闭

原因: 网络不稳定或 MongoDB 负载过高,ChangeStream 会自动关闭并触发 close 事件。

排查:

const cs = collection.watch();

cs.on('close', () => {
  console.warn('ChangeStream 已关闭,请检查网络和 MongoDB 状态');
  // 如需自动重连,可在此重新调用 collection.watch()
});

cs.on('error', (err) => {
  console.error('ChangeStream 错误:', err.message);
});

问题 3: 缓存集成

参见 缓存失效集成 章节。


watch 事件 vs 全局事件

区别说明

monSQLize 有两套事件系统:

1. 全局事件(msq 对象):

  • 监听对象:应用的查询操作
  • 事件类型:slow-query, query, connected, error, closed
  • 适用场景:性能监控、运维告警
  • 文档:事件系统

2. watch 事件(ChangeStream 对象):

  • 监听对象:MongoDB 数据变更
  • 事件类型:change, error, close, end(MongoDB 原生事件)
  • 适用场景:实时数据同步、缓存失效
  • 文档:本文档

使用场景对比

需求使用
监控应用查询性能msq.on('slow-query', ...)
调试所有查询操作msq.on('query', ...)
监听数据变更cs.on('change', ...)
应用层缓存失效cs.on('change', ...) + 手动 cache.delete
跨系统数据同步cs.on('change', ...)ChangeStreamSyncManager

示例:同时使用

const msq = new MonSQLize({
  type: 'mongodb',
  databaseName: 'shop',
  config: { uri: 'mongodb://localhost:27017' },
  slowQueryMs: 100
});

await msq.connect();

// ✅ 监听慢查询(运维)
msq.on('slow-query', (meta) => {
  console.warn('慢查询:', meta.operation, meta.duration + 'ms');
  // 发送告警
});

// ✅ 监听数据变更(业务)
const collection = msq.dbInstance.collection('products');
const cs = collection.watch();

cs.on('change', (change) => {
  console.log('数据变更:', change.operationType);
  // 缓存失效、业务通知
});

相关文档


版本历史

  • v1.1.0 (2025-12): 首次发布 watch 功能