改变流

¥Change Streams

改变流 允许你监听给定模型集合中的文档甚至整个数据库中的文档的更新。与 中间件 不同,更改流是 MongoDB 服务器构造,这意味着它们从任何地方获取更改。即使你从 MongoDB GUI 更新文档,你的 Mongoose 更改流也会收到通知。

¥Change streams let you listen for updates to documents in a given model's collection, or even documents in an entire database. Unlike middleware, change streams are a MongoDB server construct, which means they pick up changes from anywhere. Even if you update a document from a MongoDB GUI, your Mongoose change stream will be notified.

watch() 函数创建变更流。更新文档时,更改流会发出 'data' 事件。

¥The watch() function creates a change stream. Change streams emit a 'data' event when a document is updated.

const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));

// Create a change stream. The 'change' event gets emitted when there's a
// change in the database. Print what the change stream emits.
Person.watch().
  on('change', data => console.log(data));

// Insert a doc, will trigger the change stream handler above
await Person.create({ name: 'Axl Rose' });

上面的脚本将打印如下输出:

¥The above script will print output that looks like:

{ _id: { _data: '8262408DAC000000012B022C0100296E5A10042890851837DB4792BE6B235E8B85489F46645F6964006462408DAC6F5C42FF5EE087A20004' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1648397740, i: 1 }), fullDocument: { _id: new ObjectId("62408dac6f5c42ff5ee087a2"), name: 'Axl Rose', __v: 0 }, ns: { db: 'test', coll: 'people' }, documentKey: { _id: new ObjectId("62408dac6f5c42ff5ee087a2") } }

请注意,你必须连接到 MongoDB 副本集或分片集群才能使用更改流。如果你在连接到独立 MongoDB 服务器时尝试调用 watch(),你将收到以下错误。

¥Note that you must be connected to a MongoDB replica set or sharded cluster to use change streams. If you try to call watch() when connected to a standalone MongoDB server, you'll get the below error.

MongoServerError: The $changeStream stage is only supported on replica sets

如果你在生产中使用 watch(),我们建议使用 MongoDB 阿特拉斯。对于本地开发,我们建议 mongodb-memory-serverrun-rs 在本地启动副本集。

¥If you're using watch() in production, we recommend using MongoDB Atlas. For local development, we recommend mongodb-memory-server or run-rs to start a replica set locally.

使用 next() 进行迭代

¥Iterating using next()

如果你想迭代 AWS Lambda 函数 中的更改流,请不要使用事件触发器来监听更改流。你需要确保在 Lambda 函数执行完毕后关闭更改流,因为如果在更改流从 MongoDB 提取数据时 Lambda 停止容器,你的更改流可能会处于不一致的状态。

¥If you want to iterate through a change stream in a AWS Lambda function, do not use event emitters to listen to the change stream. You need to make sure you close your change stream when your Lambda function is done executing, because your change stream may end up in an inconsistent state if Lambda stops your container while the change stream is pulling data from MongoDB.

更改流还具有 next() 函数,可让你显式等待下一个更改的到来。使用 resumeAfter 跟踪最后一个更改流停止的位置,并添加超时以确保你的处理程序在没有更改进来时不会永远等待。

¥Change streams also have a next() function that lets you explicitly wait for the next change to come in. Use resumeAfter to track where the last change stream left off, and add a timeout to make sure your handler doesn't wait forever if no changes come in.

let resumeAfter = undefined;

exports.handler = async(event, context) => {
  // add this so that we can re-use any static/global variables between function calls if Lambda
  // happens to re-use existing containers for the invocation.
  context.callbackWaitsForEmptyEventLoop = false;

  await connectToDatabase();

  const changeStream = await Country.watch([], { resumeAfter });

  // Change stream `next()` will wait forever if there are no changes. So make sure to
  // stop listening to the change stream after a fixed period of time.
  const timeoutPromise = new Promise(resolve => setTimeout(() => resolve(false), 1000));
  let doc = null;
  while (doc = await Promise.race([changeStream.next(), timeoutPromise])) {
    console.log('Got', doc);
  }

  // `resumeToken` tells you where the change stream left off, so next function instance
  // can pick up any changes that happened in the meantime.
  resumeAfter = changeStream.resumeToken;
  await changeStream.close();
};