Skip to content

Commit

Permalink
Await operator added
Browse files Browse the repository at this point in the history
  • Loading branch information
dehesa committed Oct 8, 2020
1 parent 4fb8c17 commit cff128f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 1 deletion.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import PackageDescription
let package = Package(
/* Your package name, supported platforms, and generated products go here */
dependencies: [
.package(url: "https://github.com/dehesa/Conbini.git", from: "0.6.1")
.package(url: "https://github.com/dehesa/Conbini.git", from: "0.6.2")
],
targets: [
.target(name: /* Your target name here */, dependencies: ["Conbini"])
Expand Down Expand Up @@ -132,6 +132,23 @@ Conbini also introduces the `assign(to:onUnowned:)` operator which also avoids m

</p></details>

<details><summary><code>await</code></summary><p>

Wait synchronously for the response of the receiving publisher.

```swift
let publisher = Just("Hello")
.delay(for: 2, scheduler: DispatchQueue.global())

let greeting = publisher.await
```

The synchronous wait is performed through `DispatchGroup`s. Please, consider where are you using `await`, since the executing queue stops and waits for an answer:
- Never call this property from `DispatchQueue.main` or any other queue who is performing any background tasks.
- Awaiting publishers should never process events in the same queue as the executing queue (or the queue will become stalled).

</p></details>

<details><summary><code>invoke(_:on:)</code> variants.</summary><p>

This operator calls the specified function on the given value/reference passing the upstream value.
Expand Down
File renamed without changes.
27 changes: 27 additions & 0 deletions sources/runtime/operators/AwaitOp.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import Combine
import Foundation

extension Publisher {
/// Subscribes to the receiving publichser and expects a single value and a subsequent successfull completion.
///
/// If no values is received, or more than one value is received, or a failure is received, the program will crash.
/// - warning: The publisher must receive the value and completion event in a different queue from the queue where this property is called or the code will never execute.
@inlinable public var await: Output {
let group = DispatchGroup()
group.enter()

var value: Output? = nil
let cancellable = self.sink(fixedDemand: 1, receiveCompletion: {
switch $0 {
case .failure(let error): fatalError("\(error)")
case .finished:
guard case .some = value else { fatalError() }
group.leave()
}
}, receiveValue: { value = $0 })

group.wait()
cancellable.cancel()
return value!
}
}
28 changes: 28 additions & 0 deletions tests/runtime/operators/AwaitOpTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import XCTest
import Conbini
import Combine

/// Tests the correct behavior of the `await` operator.
final class AwaitOpTests: XCTestCase {
override func setUp() {
self.continueAfterFailure = false
}
}

extension AwaitOpTests {
/// Tests the `await` operator.
func testAwait() {
let publisher = Just("Hello")
.delay(for: 1, scheduler: DispatchQueue.global())

let queue = DispatchQueue(label: "io.dehesa.conbini.tests.await")
let cancellable = Just(())
.delay(for: 10, scheduler: queue)
.sink { XCTFail("The await test failed") }

let greeting = publisher.await
XCTAssertEqual(greeting, "Hello")

cancellable.cancel()
}
}

0 comments on commit cff128f

Please sign in to comment.