Mongoose 的事务

¥Transactions in Mongoose

事务 是 MongoDB 4.0 和 Mongoose 5.2.0 中的新增功能。事务允许你单独执行多个操作,并且如果其中一个操作失败,则可能会撤消所有操作。本指南将帮助你开始使用 Mongoose 进行事务处理。

¥Transactions are new in MongoDB 4.0 and Mongoose 5.2.0. Transactions let you execute multiple operations in isolation and potentially undo all the operations if one of them fails. This guide will get you started using transactions with Mongoose.

事务入门

如果你还没有导入 mongoose:

¥If you haven't already, import mongoose:

import mongoose from 'mongoose';

要创建事务,首先需要使用 Mongoose#startSessionConnection#startSession() 创建会话。

¥To create a transaction, you first need to create a session using Mongoose#startSession or Connection#startSession().

// Using Mongoose's default connection
const session = await mongoose.startSession();

// Using custom connection
const db = await mongoose.createConnection(mongodbUri).asPromise();
const session = await db.startSession();

在实践中,你应该使用 session.withTransaction() helper 或 Mongoose 的 Connection#transaction() 函数来运行事务。session.withTransaction() 助手处理:

¥In practice, you should use either the session.withTransaction() helper or Mongoose's Connection#transaction() function to run a transaction. The session.withTransaction() helper handles:

  • 创建事务

    ¥Creating a transaction

  • 如果成功则提交事务

    ¥Committing the transaction if it succeeds

  • 如果你的操作抛出异常,则中止事务

    ¥Aborting the transaction if your operation throws

  • 出现 瞬时事务错误 时重试。

    ¥Retrying in the event of a transient transaction error.

let session = null;
return Customer.createCollection().
  then(() => Customer.startSession()).
  // The `withTransaction()` function's first parameter is a function
  // that returns a promise.
  then(_session => {
    session = _session;
    return session.withTransaction(() => {
      return Customer.create([{ name: 'Test' }], { session: session });
    });
  }).
  then(() => Customer.countDocuments()).
  then(count => assert.strictEqual(count, 1)).
  then(() => session.endSession());

有关 ClientSession#withTransaction() 功能的更多信息,请参阅 MongoDB Node.js 驱动程序文档

¥For more information on the ClientSession#withTransaction() function, please see the MongoDB Node.js driver docs.

Mongoose 的 Connection#transaction() 功能是 withTransaction() 的封装器,它将 Mongoose 更改跟踪与事务集成在一起。例如,假设你在事务中 save() 了一个文档,但后来失败了。该文档中的更改不会保留到 MongoDB。Connection#transaction() 函数通知 Mongoose 更改跟踪 save() 已回滚,并将事务中更改的所有字段标记为已修改。

¥Mongoose's Connection#transaction() function is a wrapper around withTransaction() that integrates Mongoose change tracking with transactions. For example, suppose you save() a document in a transaction that later fails. The changes in that document are not persisted to MongoDB. The Connection#transaction() function informs Mongoose change tracking that the save() was rolled back, and marks all fields that were changed in the transaction as modified.

const doc = new Person({ name: 'Will Riker' });

await db.transaction(async function setRank(session) {
  doc.name = 'Captain';
  await doc.save({ session });
  doc.isNew; // false

  // Throw an error to abort the transaction
  throw new Error('Oops!');
}, { readPreference: 'primary' }).catch(() => {});

// true, `transaction()` reset the document's state because the
// transaction was aborted.
doc.isNew;

带有 Mongoose 文档和 save()

如果你使用会话从 findOne()find() 获取 Mongoose 文档,则文档将保留对该会话的引用,并将该会话用于 save()

¥If you get a Mongoose document from findOne() or find() using a session, the document will keep a reference to the session and use that session for save().

要获取/设置与给定文档关联的会话,请使用 doc.$session()

¥To get/set the session associated with a given document, use doc.$session().

const User = db.model('User', new Schema({ name: String }));

let session = null;
return User.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    return User.create({ name: 'foo' });
  }).
  then(() => {
    session.startTransaction();
    return User.findOne({ name: 'foo' }).session(session);
  }).
  then(user => {
    // Getter/setter for the session associated with this document.
    assert.ok(user.$session());
    user.name = 'bar';
    // By default, `save()` uses the associated session
    return user.save();
  }).
  then(() => User.findOne({ name: 'bar' })).
  // Won't find the doc because `save()` is part of an uncommitted transaction
  then(doc => assert.ok(!doc)).
  then(() => session.commitTransaction()).
  then(() => session.endSession()).
  then(() => User.findOne({ name: 'bar' })).
  then(doc => assert.ok(doc));

使用聚合框架

Model.aggregate() 功能还支持事务。Mongoose 聚合有一个设置 session 选项session() 帮手。下面是在事务中执行聚合的示例。

¥The Model.aggregate() function also supports transactions. Mongoose aggregations have a session() helper that sets the session option. Below is an example of executing an aggregation within a transaction.

const Event = db.model('Event', new Schema({ createdAt: Date }), 'Event');

let session = null;
return Event.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    session.startTransaction();
    return Event.insertMany([
      { createdAt: new Date('2018-06-01') },
      { createdAt: new Date('2018-06-02') },
      { createdAt: new Date('2017-06-01') },
      { createdAt: new Date('2017-05-31') }
    ], { session: session });
  }).
  then(() => Event.aggregate([
    {
      $group: {
        _id: {
          month: { $month: '$createdAt' },
          year: { $year: '$createdAt' }
        },
        count: { $sum: 1 }
      }
    },
    { $sort: { count: -1, '_id.year': -1, '_id.month': -1 } }
  ]).session(session)).
  then(res => assert.deepEqual(res, [
    { _id: { month: 6, year: 2018 }, count: 2 },
    { _id: { month: 6, year: 2017 }, count: 1 },
    { _id: { month: 5, year: 2017 }, count: 1 }
  ])).
  then(() => session.commitTransaction()).
  then(() => session.endSession());

高级用法

想要更细粒度地控制何时提交或中止事务的高级用户可以使用 session.startTransaction() 来启动事务:

¥Advanced users who want more fine-grained control over when they commit or abort transactions can use session.startTransaction() to start a transaction:

const Customer = db.model('Customer', new Schema({ name: String }));

let session = null;
return Customer.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    // Start a transaction
    session.startTransaction();
    // This `create()` is part of the transaction because of the `session`
    // option.
    return Customer.create([{ name: 'Test' }], { session: session });
  }).
  // Transactions execute in isolation, so unless you pass a `session`
  // to `findOne()` you won't see the document until the transaction
  // is committed.
  then(() => Customer.findOne({ name: 'Test' })).
  then(doc => assert.ok(!doc)).
  // This `findOne()` will return the doc, because passing the `session`
  // means this `findOne()` will run as part of the transaction.
  then(() => Customer.findOne({ name: 'Test' }).session(session)).
  then(doc => assert.ok(doc)).
  // Once the transaction is committed, the write operation becomes
  // visible outside of the transaction.
  then(() => session.commitTransaction()).
  then(() => Customer.findOne({ name: 'Test' })).
  then(doc => assert.ok(doc)).
  then(() => session.endSession());

你还可以使用 session.abortTransaction() 中止事务:

¥You can also use session.abortTransaction() to abort a transaction:

let session = null;
return Customer.createCollection().
  then(() => Customer.startSession()).
  then(_session => {
    session = _session;
    session.startTransaction();
    return Customer.create([{ name: 'Test' }], { session: session });
  }).
  then(() => Customer.create([{ name: 'Test2' }], { session: session })).
  then(() => session.abortTransaction()).
  then(() => Customer.countDocuments()).
  then(count => assert.strictEqual(count, 0)).
  then(() => session.endSession());