diff --git a/test/examples/change_streams.js b/test/examples/change_streams.js new file mode 100644 index 0000000000..f9e998fc0a --- /dev/null +++ b/test/examples/change_streams.js @@ -0,0 +1,124 @@ +'use strict'; + +const setupDatabase = require('../functional/shared').setupDatabase; +const expect = require('chai').expect; +const MongoClient = require('../../lib/mongo_client'); + +describe('examples(change-stream):', function() { + let client; + let db; + + before(async function() { + await setupDatabase(this.configuration); + }); + + beforeEach(async function() { + client = await MongoClient.connect(this.configuration.url()); + db = client.db(this.configuration.db); + + await db.collection('inventory').deleteMany({}); + }); + + afterEach(async function() { + await client.close(); + client = undefined; + db = undefined; + }); + + it('Open A Change Stream', { + metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } }, + test: async function() { + setTimeout(async function() { + await db.collection('inventory').insertOne({ a: 1 }); + }); + + // Start Changestream Example 1 + const collection = db.collection('inventory'); + const changeStream = collection.watch(); + const next = await changeStream.next(); + // End Changestream Example 1 + + await changeStream.close(); + + expect(next) + .to.have.property('operationType') + .that.equals('insert'); + } + }); + + it('Lookup Full Document for Update Operations', { + metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } }, + test: async function() { + await db.collection('inventory').insertOne({ a: 1, b: 2 }); + setTimeout(async function() { + await db.collection('inventory').updateOne({ a: 1 }, { $set: { a: 2 } }); + }); + + // Start Changestream Example 2 + const collection = db.collection('inventory'); + const changeStream = collection.watch({ fullDocument: 'updateLookup' }); + const next = await changeStream.next(); + // End Changestream Example 2 + + await changeStream.close(); + + expect(next) + .to.have.property('operationType') + .that.equals('update'); + expect(next) + .to.have.property('fullDocument') + .that.has.all.keys(['_id', 'a', 'b']); + } + }); + + it('Resume a Change Stream', { + metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } }, + test: async function() { + setTimeout(async function() { + await db.collection('inventory').insertOne({ a: 1 }); + await db.collection('inventory').insertOne({ b: 2 }); + }); + + // Start Changestream Example 3 + const collection = db.collection('inventory'); + const changeStream = collection.watch(); + const change1 = await changeStream.next(); + + const resumeAfter = change1._id; + changeStream.close(); + + const newChangeStream = collection.watch({ resumeAfter }); + const change2 = await newChangeStream.next(); + // End Changestream Example 3 + + await newChangeStream.close(); + + expect(change1).to.have.nested.property('fullDocument.a', 1); + expect(change2).to.have.nested.property('fullDocument.b', 2); + } + }); + + it('Modify Change Stream Output', { + metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } }, + test: async function() { + setTimeout(async function() { + await db.collection('inventory').insertOne({ username: 'alice' }); + }); + + // Start Changestream Example 4 + const pipeline = [ + { $match: { 'fullDocument.username': 'alice' } }, + { $addFields: { newField: 'this is an added field!' } } + ]; + const collection = db.collection('inventory'); + const changeStream = collection.watch(pipeline); + const next = await changeStream.next(); + // End Changestream Example 4 + + await changeStream.close(); + + expect(next).to.have.nested.property('fullDocument.username', 'alice'); + expect(next).to.have.property('newField', 'this is an added field!'); + } + }); +}); diff --git a/test/functional/operation_changestream_example_tests.js b/test/functional/operation_changestream_example_tests.js deleted file mode 100644 index 183ccdfdd3..0000000000 --- a/test/functional/operation_changestream_example_tests.js +++ /dev/null @@ -1,269 +0,0 @@ -'use strict'; -const setupDatabase = require('./shared').setupDatabase; -const expect = require('chai').expect; - -describe('Changestream Examples', function() { - before(function() { - return setupDatabase(this.configuration); - }); - - it('supports hasNext', { - metadata: { - requires: { - mongodb: '>=3.6.x', - topology: ['replicaset'] - } - }, - - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - const db = client.db(configuration.db); - const collection = db.collection('changeStreamExample1a'); - - collection.insertOne({ b: 2 }, function() { - // Start Changestream Example 1 - const changeStream = collection.watch(); - changeStream.next(function(err, next) { - expect(err).to.equal(null); - expect(next).to.exist; - - // Since changeStream has an implicit seession, - // we need to close the changeStream for unit testing purposes - changeStream.close(); - client.close(); - done(); - }); - // End Changestream Example 1 - - // Insert something - setTimeout(function() { - collection.insertOne({ a: 1 }, function(err, result) { - expect(err).to.equal(null); - expect(result).to.exist; - }); - }); - }); - }); - } - }); - - it('supports the EventEmitter api', { - metadata: { - requires: { - mongodb: '>=3.6.x', - topology: ['replicaset'] - } - }, - - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - const db = client.db(configuration.db); - const collection = db.collection('changeStreamExample1b'); - - // Using event emitter API - const changeStream = collection.watch(); - changeStream.on('change', function(change) { - expect(change).to.exist; - - // Since changeStream has an implicit seession, - // we need to close the changeStream for unit testing purposes - changeStream.close(); - client.close(); - done(); - }); - - // Insert something - setTimeout(function() { - collection.insertOne({ a: 1 }, function(err, result) { - expect(err).to.equal(null); - expect(result).to.exist; - }); - }); - }); - } - }); - - it('can stream a ChangeStream', { - metadata: { - requires: { - mongodb: '>=3.6.x', - topology: ['replicaset'] - } - }, - - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - const db = client.db(configuration.db); - const collection = db.collection('changeStreamExample1c'); - const changeStream = collection.watch(); - - changeStream.stream({ transform: JSON.stringify }).once('data', function(chunk) { - expect(chunk).to.exist; - - // Since changeStream has an implicit seession, - // we need to close the changeStream for unit testing purposes - changeStream.close(); - client.close(); - done(); - }); - - // Insert something - setTimeout(function() { - collection.insertOne({ a: 1 }, function(err, result) { - expect(err).to.equal(null); - expect(result).to.exist; - }); - }); - }); - } - }); - - it('can specify a full document update', { - metadata: { - requires: { - mongodb: '>=3.6.x', - topology: ['replicaset'] - } - }, - - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - const db = client.db(configuration.db); - const collection = db.collection('changeStreamExample1b'); - - // Start Changestream Example 2 - const changeStream = collection.watch({ fullDocument: 'updateLookup' }); - changeStream.on('change', function(change) { - expect(change).to.exist; - - // Since changeStream has an implicit seession, - // we need to close the changeStream for unit testing purposes - changeStream.close(); - client.close(); - done(); - }); - // End Changestream Eample 2 - - // Insert something - setTimeout(function() { - collection.insertOne({ a: 1 }, function(err, result) { - expect(err).to.equal(null); - expect(result).to.exist; - }); - }); - }); - } - }); - - it('creates and uses a resume token', { - metadata: { - requires: { - mongodb: '>=3.6.x', - topology: ['replicaset'] - } - }, - - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - const db = client.db(configuration.db); - const collection = db.collection('changeStreamExample3'); - // Start Changestream Example 3 - let resumeToken; - - const changeStream = collection.watch(); - changeStream.hasNext(function(err, change) { - expect(err).to.equal(null); - expect(change).to.exist; - changeStream.next(function(err, change) { - expect(err).to.equal(null); - - resumeToken = change._id; - - expect(change._id).to.exist; - expect(changeStream.resumeToken).to.exist; - - changeStream.close(function(err) { - expect(err).to.equal(null); - const newChangeStream = collection.watch({ resumeAfter: resumeToken }); - - newChangeStream.next(function(err, next) { - expect(err).to.equal(null); - expect(next).to.exist; - - // Since changeStream has an implicit seession, - // we need to close the changeStream for unit testing purposes - newChangeStream.close(); - client.close(); - done(); - }); - }); - }); - }); - // End Changestream Example 3 - // Insert something - setTimeout(function() { - collection.insertOne({ a: 1 }, function(err, result) { - expect(err).to.equal(null); - expect(result).to.exist; - // Insert something else - collection.insertOne({ a: 2 }, function(err, result) { - expect(err).to.equal(null); - expect(result).to.exist; - }); - }); - }); - }); - } - }); - - it('should support an aggregation pipeline as the first paramter of watch', { - metadata: { requires: { mongodb: '>=3.6.x', topology: ['replicaset'] } }, - test: function(done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - const db = client.db(configuration.db); - const collection = db.collection('changeStreamExample1a'); - - // Start Changestream Example 4 - const pipeline = [ - { $match: { 'fullDocument.username': 'alice' } }, - { $addFields: { newField: 'this is an added field!' } } - ]; - - const changeStream = collection.watch(pipeline); - changeStream.next(function(err, next) { - expect(err).to.not.exist; - expect(next).to.exist; - expect(next.fullDocument.username).to.equal('alice'); - expect(next.newField).to.exist; - expect(next.newField).to.equal('this is an added field!'); - - // Since changeStream has an implicit seession, - // we need to close the changeStream for unit testing purposes - changeStream.close(); - client.close(); - done(); - }); - // End Changestream Example 4 - - setTimeout(function() { - collection.insertOne({ username: 'alice' }, function(err, result) { - expect(err).to.not.exist; - expect(result).to.exist; - }); - }); - }); - } - }); -});