Mongoose 的事务

¥Transactions in Mongoose

事务 允许你单独执行多个操作,如果其中一个操作失败,则可能撤消所有操作。本指南将帮助你开始使用 Mongoose 进行事务处理。

¥Transactions let you execute multiple operations in isolation and potentially undo all the operations if one of them fails. This guide will get you started using transactions with Mongoose.

事务入门

¥Getting Started with Transactions

如果你还没有导入 mongoose:

¥If you haven't already, import mongoose:

import mongoose from 'mongoose';

要创建事务,首先需要使用 Mongoose#startSessionConnection#startSession() 创建会话。

¥To create a transaction, you first need to create a session using Mongoose#startSession or Connection#startSession().

// Using Mongoose's default connection
const session = await mongoose.startSession();

// Using custom connection
const db = await mongoose.createConnection(mongodbUri).asPromise();
const session = await db.startSession();

在实践中,你应该使用 session.withTransaction() helper 或 Mongoose 的 Connection#transaction() 函数来运行事务。session.withTransaction() 助手处理:

¥In practice, you should use either the session.withTransaction() helper or Mongoose's Connection#transaction() function to run a transaction. The session.withTransaction() helper handles:

  • 创建事务

    ¥Creating a transaction

  • 如果成功则提交事务

    ¥Committing the transaction if it succeeds

  • 如果你的操作抛出异常,则中止事务

    ¥Aborting the transaction if your operation throws

  • 出现 瞬时事务错误 时重试。

    ¥Retrying in the event of a transient transaction error.

let session = null;
return Customer.createCollection().
  then(() => Customer.startSession()).
  // The `withTransaction()` function's first parameter is a function
  // that returns a promise.
  then(_session => {
    session = _session;
    return session.withTransaction(() => {
      return Customer.create([{ name: 'Test' }], { session: session });
    });
  }).
  then(() => Customer.countDocuments()).
  then(count => assert.strictEqual(count, 1)).
  then(() => session.endSession());

有关 ClientSession#withTransaction() 功能的更多信息,请参阅 MongoDB Node.js 驱动程序文档

¥For more information on the ClientSession#withTransaction() function, please see the MongoDB Node.js driver docs.

Mongoose 的 Connection#transaction() 功能是 withTransaction() 的封装器,它将 Mongoose 更改跟踪与事务集成在一起。例如,假设你在事务中 save() 了一个文档,但后来失败了。该文档中的更改不会保留到 MongoDB。Connection#transaction() 函数通知 Mongoose 更改跟踪 save() 已回滚,并将事务中更改的所有字段标记为已修改。

¥Mongoose's Connection#transaction() function is a wrapper around withTransaction() that integrates Mongoose change tracking with transactions. For example, suppose you save() a document in a transaction that later fails. The changes in that document are not persisted to MongoDB. The Connection#transaction() function informs Mongoose change tracking that the save() was rolled back, and marks all fields that were changed in the transaction as modified.

const doc = new Person({ name: 'Will Riker' });

await db.transaction(async function setRank(session) {
  doc.name = 'Captain';
  await doc.save({ session });
  doc.isNew; // false

  // Throw an error to abort the transaction
  throw new Error('Oops!');
}, { readPreference: 'primary' }).catch(() => {});

// true, `transaction()` reset the document's state because the
// transaction was aborted.
doc.isNew;

关于事务并行性的注意事项

¥Note About Parallelism in Transactions

事务期间不支持并行运行操作。使用 Promise.allPromise.allSettledPromise.race 等在事务内并行化操作是未定义的行为,应避免。

¥Running operations in parallel is not supported during a transaction. The use of Promise.all, Promise.allSettled, Promise.race, etc. to parallelize operations inside a transaction is undefined behaviour and should be avoided.

带有 Mongoose 文档和 save()

¥With Mongoose Documents and save()

如果你使用会话从 findOne()find() 获取 Mongoose 文档,则文档将保留对该会话的引用,并将该会话用于 save()

¥If you get a Mongoose document from findOne() or find() using a session, the document will keep a reference to the session and use that session for save().

要获取/设置与给定文档关联的会话,请使用 doc.$session()

¥To get/set the session associated with a given document, use doc.$session().

const User = db.model('User', new Schema({ name: String }));

let session = null;
return User.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    return User.create({ name: 'foo' });
  }).
  then(() => {
    session.startTransaction();
    return User.findOne({ name: 'foo' }).session(session);
  }).
  then(user => {
    // Getter/setter for the session associated with this document.
    assert.ok(user.$session());
    user.name = 'bar';
    // By default, `save()` uses the associated session
    return user.save();
  }).
  then(() => User.findOne({ name: 'bar' })).
  // Won't find the doc because `save()` is part of an uncommitted transaction
  then(doc => assert.ok(!doc)).
  then(() => session.commitTransaction()).
  then(() => session.endSession()).
  then(() => User.findOne({ name: 'bar' })).
  then(doc => assert.ok(doc));

使用聚合框架

¥With the Aggregation Framework

Model.aggregate() 功能还支持事务。Mongoose 聚合有一个设置 session 选项session() 帮手。下面是在事务中执行聚合的示例。

¥The Model.aggregate() function also supports transactions. Mongoose aggregations have a session() helper that sets the session option. Below is an example of executing an aggregation within a transaction.

const Event = db.model('Event', new Schema({ createdAt: Date }), 'Event');

let session = null;
return Event.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    session.startTransaction();
    return Event.insertMany([
      { createdAt: new Date('2018-06-01') },
      { createdAt: new Date('2018-06-02') },
      { createdAt: new Date('2017-06-01') },
      { createdAt: new Date('2017-05-31') }
    ], { session: session });
  }).
  then(() => Event.aggregate([
    {
      $group: {
        _id: {
          month: { $month: '$createdAt' },
          year: { $year: '$createdAt' }
        },
        count: { $sum: 1 }
      }
    },
    { $sort: { count: -1, '_id.year': -1, '_id.month': -1 } }
  ]).session(session)).
  then(res => assert.deepEqual(res, [
    { _id: { month: 6, year: 2018 }, count: 2 },
    { _id: { month: 6, year: 2017 }, count: 1 },
    { _id: { month: 5, year: 2017 }, count: 1 }
  ])).
  then(() => session.commitTransaction()).
  then(() => session.endSession());

使用 AsyncLocalStorage

¥Using AsyncLocalStorage

Mongoose 中事务的一个主要痛点是你需要记住在每个操作上设置 session 选项。如果不这样做,你的操作将在事务之外执行。Mongoose 8.4 能够使用 Node 的 AsyncLocalStorage APIConnection.prototype.transaction() 执行器函数内的所有操作上设置 session 操作。使用 mongoose.set('transactionAsyncLocalStorage', true) 设置 transactionAsyncLocalStorage 选项以启用此功能。

¥One major pain point with transactions in Mongoose is that you need to remember to set the session option on every operation. If you don't, your operation will execute outside of the transaction. Mongoose 8.4 is able to set the session operation on all operations within a Connection.prototype.transaction() executor function using Node's AsyncLocalStorage API. Set the transactionAsyncLocalStorage option using mongoose.set('transactionAsyncLocalStorage', true) to enable this feature.

mongoose.set('transactionAsyncLocalStorage', true);

const Test = mongoose.model('Test', mongoose.Schema({ name: String }));

const doc = new Test({ name: 'test' });

// Save a new doc in a transaction that aborts
await connection.transaction(async() => {
  await doc.save(); // Notice no session here
  throw new Error('Oops');
}).catch(() => {});

// false, `save()` was rolled back
await Test.exists({ _id: doc._id });

使用 transactionAsyncLocalStorage,你不再需要将会话传递给每个操作。Mongoose 将在后台默认添加会话。

¥With transactionAsyncLocalStorage, you no longer need to pass sessions to every operation. Mongoose will add the session by default under the hood.

高级用法

¥Advanced Usage

想要更细粒度地控制何时提交或中止事务的高级用户可以使用 session.startTransaction() 来启动事务:

¥Advanced users who want more fine-grained control over when they commit or abort transactions can use session.startTransaction() to start a transaction:

const Customer = db.model('Customer', new Schema({ name: String }));

let session = null;
return Customer.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    // Start a transaction
    session.startTransaction();
    // This `create()` is part of the transaction because of the `session`
    // option.
    return Customer.create([{ name: 'Test' }], { session: session });
  }).
  // Transactions execute in isolation, so unless you pass a `session`
  // to `findOne()` you won't see the document until the transaction
  // is committed.
  then(() => Customer.findOne({ name: 'Test' })).
  then(doc => assert.ok(!doc)).
  // This `findOne()` will return the doc, because passing the `session`
  // means this `findOne()` will run as part of the transaction.
  then(() => Customer.findOne({ name: 'Test' }).session(session)).
  then(doc => assert.ok(doc)).
  // Once the transaction is committed, the write operation becomes
  // visible outside of the transaction.
  then(() => session.commitTransaction()).
  then(() => Customer.findOne({ name: 'Test' })).
  then(doc => assert.ok(doc)).
  then(() => session.endSession());

你还可以使用 session.abortTransaction() 中止事务:

¥You can also use session.abortTransaction() to abort a transaction:

let session = null;
return Customer.createCollection().
  then(() => Customer.startSession()).
  then(_session => {
    session = _session;
    session.startTransaction();
    return Customer.create([{ name: 'Test' }], { session: session });
  }).
  then(() => Customer.create([{ name: 'Test2' }], { session: session })).
  then(() => session.abortTransaction()).
  then(() => Customer.countDocuments()).
  then(count => assert.strictEqual(count, 0)).
  then(() => session.endSession());