diff --git a/Package.swift b/Package.swift index f6493e28..ffffbd48 100644 --- a/Package.swift +++ b/Package.swift @@ -16,11 +16,13 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), .package(url: "https://github.com/vapor/sql-kit.git", from: "3.0.0-rc.1"), + .package(url: "https://github.com/vapor/async-kit.git", from: "1.0.0-rc.1"), ], targets: [ .target(name: "FluentKit", dependencies: [ .product(name: "NIO", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), + .product(name: "AsyncKit", package: "async-kit"), ]), .target(name: "FluentBenchmark", dependencies: [ .target(name: "FluentKit"), diff --git a/Sources/FluentBenchmark/FluentBenchmarker.swift b/Sources/FluentBenchmark/FluentBenchmarker.swift index 82aaaf77..9630fe22 100644 --- a/Sources/FluentBenchmark/FluentBenchmarker.swift +++ b/Sources/FluentBenchmark/FluentBenchmarker.swift @@ -14,6 +14,7 @@ public final class FluentBenchmarker { } public init(databases: Databases) { + precondition(databases.ids().count >= 2, "FluentBenchmarker Databases instance must have 2 or more registered databases") self.databases = databases } diff --git a/Sources/FluentBenchmark/SolarSystem/SolarSystem.swift b/Sources/FluentBenchmark/SolarSystem/SolarSystem.swift index 35b71cbd..a5b299d2 100644 --- a/Sources/FluentBenchmark/SolarSystem/SolarSystem.swift +++ b/Sources/FluentBenchmark/SolarSystem/SolarSystem.swift @@ -1,3 +1,5 @@ +import AsyncKit + private let migrations: [Migration] = [ GalaxyMigration(), StarMigration(), @@ -29,9 +31,8 @@ public struct SolarSystem: Migration { } else { all = migrations } - return .andAllSync(all.map { migration in - { migration.prepare(on: database) } - }, on: database.eventLoop) + + return EventLoopFutureQueue(eventLoop: database.eventLoop).append(each: all) { $0.prepare(on: database) } } public func revert(on database: Database) -> EventLoopFuture { @@ -41,8 +42,7 @@ public struct SolarSystem: Migration { } else { all = migrations } - return .andAllSync(all.reversed().map { migration in - { migration.revert(on: database) } - }, on: database.eventLoop) + + return EventLoopFutureQueue(eventLoop: database.eventLoop).append(each: all.reversed()) { $0.revert(on: database) } } } diff --git a/Sources/FluentBenchmark/Tests/MigratorTests.swift b/Sources/FluentBenchmark/Tests/MigratorTests.swift index 2960d6c8..e3e2c240 100644 --- a/Sources/FluentBenchmark/Tests/MigratorTests.swift +++ b/Sources/FluentBenchmark/Tests/MigratorTests.swift @@ -2,6 +2,7 @@ extension FluentBenchmarker { public func testMigrator() throws { try self.testMigrator_success() try self.testMigrator_error() + try self.testMigrator_sequence() } private func testMigrator_success() throws { @@ -53,6 +54,83 @@ extension FluentBenchmarker { try migrator.revertAllBatches().wait() } } + + private func testMigrator_sequence() throws { + try self.runTest(#function, []) { + + // Setup + let ids = Array(self.databases.ids()) + let databaseID = (ids[0], ids[1]) + + let database1 = try XCTUnwrap( + self.databases.database( + databaseID.0, + logger: Logger(label: "codes.vapor.tests"), + on: self.databases.eventLoopGroup.next() + ) + ) + let database2 = try XCTUnwrap( + self.databases.database( + databaseID.1, + logger: Logger(label: "codes.vapor.tests"), + on: self.databases.eventLoopGroup.next() + ) + ) + + let migrations = Migrations() + + + // Migration #1 + migrations.add(GalaxyMigration(), to: databaseID.0) + + let migrator = Migrator( + databases: self.databases, + migrations: migrations, + logger: Logger(label: "codes.vapor.tests"), + on: self.databases.eventLoopGroup.next() + ) + + try migrator.setupIfNeeded().wait() + try migrator.prepareBatch().wait() + + let logs1 = try MigrationLog.query(on: database1).all().wait() + XCTAssertEqual(logs1.count, 1) + let log1 = try XCTUnwrap(logs1.first) + XCTAssertEqual(log1.batch, 1) + XCTAssertEqual(log1.name, "\(GalaxyMigration.self)") + + do { + let count = try MigrationLog.query(on: database2).count().wait() + + // This is a valid state to enter. Unlike databases in the SQL family, + // some databases such as MongoDB won't throw an error if the table doesn't exist. + XCTAssertEqual(count, 0) + } catch { + // This is a valid state to enter. A SQL database will throw an error + // because the `_fluent_migrations` table on the `database2` database + // will have not been created yet. + } + + + // Migration #2 + migrations.add(GalaxyMigration(), to: databaseID.1) + + try migrator.setupIfNeeded().wait() + try migrator.prepareBatch().wait() + + let logs2 = try MigrationLog.query(on: database2).all().wait() + XCTAssertEqual(logs2.count, 1) + let log2 = try XCTUnwrap(logs2.first) + XCTAssertEqual(log2.batch, 1) + XCTAssertEqual(log2.name, "\(GalaxyMigration.self)") + + try XCTAssertEqual(MigrationLog.query(on: database1).count().wait(), 1) + + + // Teardown + try migrator.revertAllBatches().wait() + } + } } private struct ErrorMigration: Migration { diff --git a/Sources/FluentKit/Database/Databases.swift b/Sources/FluentKit/Database/Databases.swift index 705f8478..e899754b 100644 --- a/Sources/FluentKit/Database/Databases.swift +++ b/Sources/FluentKit/Database/Databases.swift @@ -122,6 +122,10 @@ public final class Databases { } } + public func ids() -> Set { + return self.lock.withLock { Set(self.configurations.keys) } + } + public func shutdown() { self.lock.lock() defer { self.lock.unlock() } diff --git a/Sources/FluentKit/Migration/Migrations.swift b/Sources/FluentKit/Migration/Migrations.swift index ad289966..25ce79e6 100644 --- a/Sources/FluentKit/Migration/Migrations.swift +++ b/Sources/FluentKit/Migration/Migrations.swift @@ -5,6 +5,7 @@ public final class Migrations { } var storage: [Item] + var databases: Set { Set(self.storage.map(\.id)) } public init() { self.storage = [] diff --git a/Sources/FluentKit/Migration/Migrator.swift b/Sources/FluentKit/Migration/Migrator.swift index a39461b7..0c4d17e4 100644 --- a/Sources/FluentKit/Migration/Migrator.swift +++ b/Sources/FluentKit/Migration/Migrator.swift @@ -1,11 +1,12 @@ import Foundation +import AsyncKit import Logging public struct Migrator { public let databaseFactory: (DatabaseID?) -> (Database) public let migrations: Migrations public let eventLoop: EventLoop - + public init( databases: Databases, migrations: Migrations, @@ -34,181 +35,204 @@ public struct Migrator { // MARK: Setup public func setupIfNeeded() -> EventLoopFuture { - MigrationLog.migration.prepare(on: self.database(nil)) + return self.migrators() { $0.setupIfNeeded() }.transform(to: ()) } // MARK: Prepare public func prepareBatch() -> EventLoopFuture { - self.unpreparedMigrations().flatMap { migrations in - self.lastBatchNumber() - .and(value: migrations) - }.flatMap { (lastBatch, migrations) in - .andAllSync(migrations.map { item in - { self.prepare(item, batch: lastBatch + 1) } - }, on: self.eventLoop) - } + return self.migrators() { $0.prepareBatch() }.transform(to: ()) } // MARK: Revert public func revertLastBatch() -> EventLoopFuture { - self.lastBatchNumber().flatMap { - self.revertBatch(number: $0) - } + return self.migrators() { $0.revertLastBatch() }.transform(to: ()) } public func revertBatch(number: Int) -> EventLoopFuture { - self.preparedMigrations(batch: number).flatMap { migrations in - EventLoopFuture.andAllSync(migrations.map { item in - { self.revert(item) } - }, on: self.eventLoop) - } + return self.migrators() { $0.revertBatch(number: number) }.transform(to: ()) } public func revertAllBatches() -> EventLoopFuture { - self.preparedMigrations().flatMap { migrations in - .andAllSync(migrations.map { item in - { self.revert(item) } - }, on: self.eventLoop) - }.flatMap { _ in - self.revertMigrationLog() - } + return self.migrators() { $0.revertAllBatches() }.transform(to: ()) } // MARK: Preview public func previewPrepareBatch() -> EventLoopFuture<[(Migration, DatabaseID?)]> { - self.unpreparedMigrations().map { items in - items.map { item in - (item.migration, item.id) + return self.migrators() { migrator in + return migrator.previewPrepareBatch().and(value: migrator.id) + }.map { items in + return items.reduce(into: []) { result, batch in + let pairs = batch.0.map { ($0, batch.1) } + result.append(contentsOf: pairs) } } } public func previewRevertLastBatch() -> EventLoopFuture<[(Migration, DatabaseID?)]> { - self.lastBatchNumber().flatMap { lastBatch in - self.preparedMigrations(batch: lastBatch) + return self.migrators() { migrator in + return migrator.previewRevertLastBatch().and(value: migrator.id) }.map { items in - items.map { item in - (item.migration, item.id) + return items.reduce(into: []) { result, batch in + let pairs = batch.0.map { ($0, batch.1) } + result.append(contentsOf: pairs) } } } - public func previewRevertBatch(number: Int) -> EventLoopFuture<[(Migration, DatabaseID?)]> { - self.preparedMigrations(batch: number).map { items -> [(Migration, DatabaseID?)] in - items.map { item -> (Migration, DatabaseID?) in - return (item.migration, item.id) + public func previewRevertBatch() -> EventLoopFuture<[(Migration, DatabaseID?)]> { + return self.migrators() { migrator in + return migrator.previewPrepareBatch().and(value: migrator.id) + }.map { items in + return items.reduce(into: []) { result, batch in + let pairs = batch.0.map { ($0, batch.1) } + result.append(contentsOf: pairs) } } } public func previewRevertAllBatches() -> EventLoopFuture<[(Migration, DatabaseID?)]> { - self.preparedMigrations().map { items -> [(Migration, DatabaseID?)] in - items.map { item -> (Migration, DatabaseID?) in - return (item.migration, item.id) + return self.migrators() { migrator in + return migrator.previewRevertAllBatches().and(value: migrator.id) + }.map { items in + return items.reduce(into: []) { result, batch in + let pairs = batch.0.map { ($0, batch.1) } + result.append(contentsOf: pairs) } } } - + + + private func migrators( + _ handler: (DatabaseMigrator) -> EventLoopFuture + ) -> EventLoopFuture<[Result]> { + return self.migrations.databases.map { id in + let migrations = self.migrations.storage.compactMap { item -> Migration? in + guard item.id == id else { return nil } + return item.migration + } + + let migrator = DatabaseMigrator(id: id, database: self.databaseFactory(id), migrations: migrations) + return handler(migrator) + }.flatten(on: self.eventLoop) + } +} + +private final class DatabaseMigrator { + let migrations: [Migration] + let database: Database + let id: DatabaseID? + + init(id: DatabaseID?, database: Database, migrations: [Migration]) { + self.migrations = migrations + self.database = database + self.id = id + } + + // MARK: Setup + + func setupIfNeeded() -> EventLoopFuture { + return MigrationLog.migration.prepare(on: self.database) + } + + // MARK: Prepare + + func prepareBatch() -> EventLoopFuture { + return self.unpreparedMigrations().flatMap { migrations in + return self.lastBatchNumber().and(value: migrations) + }.flatMap { batch, migrations in + return EventLoopFutureQueue(eventLoop: self.database.eventLoop).append(each: migrations) { migration in + self.prepare(migration, batch: batch + 1) + } + } + } + + // MARK: Revert + + func revertLastBatch() -> EventLoopFuture { + return self.lastBatchNumber().flatMap(self.revertBatch(number:)) + } + + func revertBatch(number: Int) -> EventLoopFuture { + return self.preparedMigrations(batch: number).flatMap { migrations in + return EventLoopFutureQueue(eventLoop: self.database.eventLoop).append(each: migrations, self.revert) + } + } + + func revertAllBatches() -> EventLoopFuture { + return self.preparedMigrations().flatMap { migrations in + return EventLoopFutureQueue(eventLoop: self.database.eventLoop).append(each: migrations, self.revert) + } + } + + // MARK: Preview + + func previewPrepareBatch() -> EventLoopFuture<[Migration]> { + return self.unpreparedMigrations() + } + + func previewRevertLastBatch() -> EventLoopFuture<[Migration]> { + return self.lastBatchNumber().flatMap { batch in + return self.preparedMigrations(batch: batch) + } + } + + func previewRevertBatch(number: Int) -> EventLoopFuture<[Migration]> { + return self.preparedMigrations(batch: number) + } + + func previewRevertAllBatches() -> EventLoopFuture<[Migration]> { + return self.preparedMigrations() + } + // MARK: Private - - private func prepare(_ item: Migrations.Item, batch: Int) -> EventLoopFuture { - item.migration.prepare(on: self.database(item.id)).flatMap { - MigrationLog(name: item.migration.name, batch: batch) - .save(on: self.database(nil)) + + private func prepare(_ migration: Migration, batch: Int) -> EventLoopFuture { + return migration.prepare(on: self.database).flatMap { + return MigrationLog(name: migration.name, batch: batch).save(on: self.database) } } - - private func revert(_ item: Migrations.Item) -> EventLoopFuture { - item.migration.revert(on: self.database(item.id)).flatMap { - MigrationLog.query(on: self.database(nil)) - .filter(\.$name == item.migration.name) - .delete() + + private func revert(_ migration: Migration) -> EventLoopFuture { + return migration.revert(on: self.database).flatMap { + return MigrationLog.query(on: self.database).filter(\.$name == migration.name).delete() } } - + private func revertMigrationLog() -> EventLoopFuture { - MigrationLog.migration.revert(on: self.database(nil)) + return MigrationLog.migration.revert(on: self.database) } - + private func lastBatchNumber() -> EventLoopFuture { - MigrationLog.query(on: self.database(nil)).sort(\.$batch, .descending).first().map { log in + return MigrationLog.query(on: self.database).sort(\.$batch, .descending).first().map { log in log?.batch ?? 0 } } - - private func preparedMigrations() -> EventLoopFuture<[Migrations.Item]> { - MigrationLog.query(on: self.database(nil)).all().map { logs -> [Migrations.Item] in - self.migrations.storage.filter { storage in - logs.contains { log in - storage.migration.name == log.name - } + + private func preparedMigrations() -> EventLoopFuture<[Migration]> { + return MigrationLog.query(on: self.database).all().map { logs in + return self.migrations.filter { migration in + return logs.contains(where: { $0.name == migration.name }) }.reversed() } } - - private func preparedMigrations(batch: Int) -> EventLoopFuture<[Migrations.Item]> { - MigrationLog.query(on: self.database(nil)).filter(\.$batch == batch).all().map { logs in - self.migrations.storage.filter { storage in - logs.contains { log in - storage.migration.name == log.name - } + + private func preparedMigrations(batch: Int) -> EventLoopFuture<[Migration]> { + return MigrationLog.query(on: self.database).filter(\.$batch == batch).all().map { logs in + return self.migrations.filter { migration in + return logs.contains(where: { $0.name == migration.name }) }.reversed() } } - - private func unpreparedMigrations() -> EventLoopFuture<[Migrations.Item]> { - return MigrationLog.query(on: self.database(nil)) - .all() - .map - { logs -> [Migrations.Item] in - return self.migrations.storage.compactMap { item in - if logs.filter({ $0.name == item.migration.name }).count == 0 { - return item - } else { - // log found, this has been prepared - return nil - } - } - } - } - - private func database(_ id: DatabaseID?) -> Database { - self.databaseFactory(id) - } -} -extension EventLoopFuture { - public static func andAllSync( - _ futures: [() -> EventLoopFuture], - on eventLoop: EventLoop - ) -> EventLoopFuture { - let promise = eventLoop.makePromise(of: Void.self) - - var iterator = futures.makeIterator() - func handle(_ future: () -> EventLoopFuture) { - future().whenComplete { res in - switch res { - case .success: - if let next = iterator.next() { - handle(next) - } else { - promise.succeed(()) - } - case .failure(let error): - promise.fail(error) - } + private func unpreparedMigrations() -> EventLoopFuture<[Migration]> { + return MigrationLog.query(on: self.database).all().map { logs in + return self.migrations.compactMap { migration in + if logs.contains(where: { $0.name == migration.name }) { return nil } + return migration } } - - if let first = iterator.next() { - handle(first) - } else { - promise.succeed(()) - } - - return promise.futureResult } } diff --git a/Sources/FluentKit/Query/Builder/QueryBuilder.swift b/Sources/FluentKit/Query/Builder/QueryBuilder.swift index dce60d96..b2bd3230 100644 --- a/Sources/FluentKit/Query/Builder/QueryBuilder.swift +++ b/Sources/FluentKit/Query/Builder/QueryBuilder.swift @@ -1,3 +1,4 @@ +import AsyncKit import NIO public final class QueryBuilder @@ -206,9 +207,9 @@ public final class QueryBuilder return self.database.eventLoop.makeSucceededFuture(()) } // run eager loads - return .andAllSync(self.eagerLoaders.map { eagerLoad in - { eagerLoad.anyRun(models: all, on: self.database) } - }, on: self.database.eventLoop) + return EventLoopFutureQueue(eventLoop: self.database.eventLoop).append(each: self.eagerLoaders) { loader in + return loader.anyRun(models: all, on: self.database) + } } } else { return done