From cff128fe48e7bc1fbdfbfc00b68df2a0bddef542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcos=20S=C3=A1nchez-Dehesa=20Carballo?= Date: Thu, 8 Oct 2020 13:48:44 +0200 Subject: [PATCH] Await operator added --- README.md | 19 ++++++++++++- .../{Assign.swift => AssignOp.swift} | 0 sources/runtime/operators/AwaitOp.swift | 27 ++++++++++++++++++ tests/runtime/operators/AwaitOpTests.swift | 28 +++++++++++++++++++ 4 files changed, 73 insertions(+), 1 deletion(-) rename sources/runtime/operators/{Assign.swift => AssignOp.swift} (100%) create mode 100644 sources/runtime/operators/AwaitOp.swift create mode 100644 tests/runtime/operators/AwaitOpTests.swift diff --git a/README.md b/README.md index 4f5eb92..287139a 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ import PackageDescription let package = Package( /* Your package name, supported platforms, and generated products go here */ dependencies: [ - .package(url: "https://github.com/dehesa/Conbini.git", from: "0.6.1") + .package(url: "https://github.com/dehesa/Conbini.git", from: "0.6.2") ], targets: [ .target(name: /* Your target name here */, dependencies: ["Conbini"]) @@ -132,6 +132,23 @@ Conbini also introduces the `assign(to:onUnowned:)` operator which also avoids m

+
await

+ +Wait synchronously for the response of the receiving publisher. + +```swift +let publisher = Just("Hello") + .delay(for: 2, scheduler: DispatchQueue.global()) + +let greeting = publisher.await +``` + +The synchronous wait is performed through `DispatchGroup`s. Please, consider where are you using `await`, since the executing queue stops and waits for an answer: +- Never call this property from `DispatchQueue.main` or any other queue who is performing any background tasks. +- Awaiting publishers should never process events in the same queue as the executing queue (or the queue will become stalled). + +

+
invoke(_:on:) variants.

This operator calls the specified function on the given value/reference passing the upstream value. diff --git a/sources/runtime/operators/Assign.swift b/sources/runtime/operators/AssignOp.swift similarity index 100% rename from sources/runtime/operators/Assign.swift rename to sources/runtime/operators/AssignOp.swift diff --git a/sources/runtime/operators/AwaitOp.swift b/sources/runtime/operators/AwaitOp.swift new file mode 100644 index 0000000..f409102 --- /dev/null +++ b/sources/runtime/operators/AwaitOp.swift @@ -0,0 +1,27 @@ +import Combine +import Foundation + +extension Publisher { + /// Subscribes to the receiving publichser and expects a single value and a subsequent successfull completion. + /// + /// If no values is received, or more than one value is received, or a failure is received, the program will crash. + /// - warning: The publisher must receive the value and completion event in a different queue from the queue where this property is called or the code will never execute. + @inlinable public var await: Output { + let group = DispatchGroup() + group.enter() + + var value: Output? = nil + let cancellable = self.sink(fixedDemand: 1, receiveCompletion: { + switch $0 { + case .failure(let error): fatalError("\(error)") + case .finished: + guard case .some = value else { fatalError() } + group.leave() + } + }, receiveValue: { value = $0 }) + + group.wait() + cancellable.cancel() + return value! + } +} diff --git a/tests/runtime/operators/AwaitOpTests.swift b/tests/runtime/operators/AwaitOpTests.swift new file mode 100644 index 0000000..1171395 --- /dev/null +++ b/tests/runtime/operators/AwaitOpTests.swift @@ -0,0 +1,28 @@ +import XCTest +import Conbini +import Combine + +/// Tests the correct behavior of the `await` operator. +final class AwaitOpTests: XCTestCase { + override func setUp() { + self.continueAfterFailure = false + } +} + +extension AwaitOpTests { + /// Tests the `await` operator. + func testAwait() { + let publisher = Just("Hello") + .delay(for: 1, scheduler: DispatchQueue.global()) + + let queue = DispatchQueue(label: "io.dehesa.conbini.tests.await") + let cancellable = Just(()) + .delay(for: 10, scheduler: queue) + .sink { XCTFail("The await test failed") } + + let greeting = publisher.await + XCTAssertEqual(greeting, "Hello") + + cancellable.cancel() + } +}