Mongoose 的事务

事务 是 MongoDB 4.0 和 Mongoose 5.2.0 中的新增功能。 事务允许你单独执行多个操作,并且如果其中一个操作失败,则可能会撤消所有操作。 本指南将帮助你开始使用 Mongoose 进行事务处理。

事务入门

如果你还没有导入 mongoose:

import mongoose from 'mongoose';

要创建事务,首先需要使用 Mongoose#startSessionConnection#startSession() 创建会话。

// Using Mongoose's default connection
const session = await mongoose.startSession();

// Using custom connection
const db = await mongoose.createConnection(mongodbUri).asPromise();
const session = await db.startSession();

在实践中,你应该使用 session.withTransaction() 帮手 或 Mongoose 的 Connection#transaction() 函数来运行事务。 session.withTransaction() 助手处理:

  • 创建事务
  • 如果成功则提交事务
  • 如果你的操作抛出异常,则中止事务
  • 出现 瞬时事务错误 时重试。
let session = null;
return Customer.createCollection().
  then(() => Customer.startSession()).
  // The `withTransaction()` function's first parameter is a function
  // that returns a promise.
  then(_session => {
    session = _session;
    return session.withTransaction(() => {
      return Customer.create([{ name: 'Test' }], { session: session });
    });
  }).
  then(() => Customer.countDocuments()).
  then(count => assert.strictEqual(count, 1)).
  then(() => session.endSession());

有关 ClientSession#withTransaction() 功能的更多信息,请参阅 MongoDB Node.js 驱动程序文档

Mongoose 的 Connection#transaction() 功能是 withTransaction() 的封装器,它将 Mongoose 更改跟踪与事务集成在一起。 例如,假设你在事务中 save() 了一个文档,但后来失败了。 该文档中的更改不会保留到 MongoDB。 Connection#transaction() 函数通知 Mongoose 更改跟踪 save() 已回滚,并将事务中更改的所有字段标记为已修改。

const doc = new Person({ name: 'Will Riker' });

await db.transaction(async function setRank(session) {
  doc.name = 'Captain';
  await doc.save({ session });
  doc.isNew; // false

  // Throw an error to abort the transaction
  throw new Error('Oops!');
}, { readPreference: 'primary' }).catch(() => {});

// true, `transaction()` reset the document's state because the
// transaction was aborted.
doc.isNew;

使用 Mongoose 文档和 save()

如果你使用会话从 findOne()find() 获取 Mongoose 文档,则文档将保留对该会话的引用,并将该会话用于 save()

要获取/设置与给定文档关联的会话,请使用 doc.$session()

const User = db.model('User', new Schema({ name: String }));

let session = null;
return User.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    return User.create({ name: 'foo' });
  }).
  then(() => {
    session.startTransaction();
    return User.findOne({ name: 'foo' }).session(session);
  }).
  then(user => {
    // Getter/setter for the session associated with this document.
    assert.ok(user.$session());
    user.name = 'bar';
    // By default, `save()` uses the associated session
    return user.save();
  }).
  then(() => User.findOne({ name: 'bar' })).
  // Won't find the doc because `save()` is part of an uncommitted transaction
  then(doc => assert.ok(!doc)).
  then(() => session.commitTransaction()).
  then(() => session.endSession()).
  then(() => User.findOne({ name: 'bar' })).
  then(doc => assert.ok(doc));

使用聚合框架

Model.aggregate() 功能还支持事务。 Mongoose 聚合有一个设置 session 选项session() 帮手。 下面是在事务中执行聚合的示例。

const Event = db.model('Event', new Schema({ createdAt: Date }), 'Event');

let session = null;
return Event.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    session.startTransaction();
    return Event.insertMany([
      { createdAt: new Date('2018-06-01') },
      { createdAt: new Date('2018-06-02') },
      { createdAt: new Date('2017-06-01') },
      { createdAt: new Date('2017-05-31') }
    ], { session: session });
  }).
  then(() => Event.aggregate([
    {
      $group: {
        _id: {
          month: { $month: '$createdAt' },
          year: { $year: '$createdAt' }
        },
        count: { $sum: 1 }
      }
    },
    { $sort: { count: -1, '_id.year': -1, '_id.month': -1 } }
  ]).session(session)).
  then(res => assert.deepEqual(res, [
    { _id: { month: 6, year: 2018 }, count: 2 },
    { _id: { month: 6, year: 2017 }, count: 1 },
    { _id: { month: 5, year: 2017 }, count: 1 }
  ])).
  then(() => session.commitTransaction()).
  then(() => session.endSession());

高级用法

想要更细粒度地控制何时提交或中止事务的高级用户可以使用 session.startTransaction() 来启动事务:

const Customer = db.model('Customer', new Schema({ name: String }));

let session = null;
return Customer.createCollection().
  then(() => db.startSession()).
  then(_session => {
    session = _session;
    // Start a transaction
    session.startTransaction();
    // This `create()` is part of the transaction because of the `session`
    // option.
    return Customer.create([{ name: 'Test' }], { session: session });
  }).
  // Transactions execute in isolation, so unless you pass a `session`
  // to `findOne()` you won't see the document until the transaction
  // is committed.
  then(() => Customer.findOne({ name: 'Test' })).
  then(doc => assert.ok(!doc)).
  // This `findOne()` will return the doc, because passing the `session`
  // means this `findOne()` will run as part of the transaction.
  then(() => Customer.findOne({ name: 'Test' }).session(session)).
  then(doc => assert.ok(doc)).
  // Once the transaction is committed, the write operation becomes
  // visible outside of the transaction.
  then(() => session.commitTransaction()).
  then(() => Customer.findOne({ name: 'Test' })).
  then(doc => assert.ok(doc)).
  then(() => session.endSession());

你还可以使用 session.abortTransaction() 中止事务:

let session = null;
return Customer.createCollection().
  then(() => Customer.startSession()).
  then(_session => {
    session = _session;
    session.startTransaction();
    return Customer.create([{ name: 'Test' }], { session: session });
  }).
  then(() => Customer.create([{ name: 'Test2' }], { session: session })).
  then(() => session.abortTransaction()).
  then(() => Customer.countDocuments()).
  then(count => assert.strictEqual(count, 0)).
  then(() => session.endSession());