Skip to content

Commit

Permalink
Add better promise handling and test (#52)
Browse files Browse the repository at this point in the history
Additional promise tests

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>

Signed-off-by: EandJsFilmCrew <789213+rvsrvs@users.noreply.github.com>
  • Loading branch information
rvsrvs authored Aug 13, 2022
1 parent 4e92118 commit 30239ec
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 12 deletions.
8 changes: 6 additions & 2 deletions Sources/FreeCombine/Decombinator/PromiseReceiveState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Van Simmons on 7/27/22.
//
public struct PromiseReceiveState<Output: Sendable> {
fileprivate let resolution: ValueRef<Result<Output, Swift.Error>?> = .init(value: .none)
var promiseChannel: Channel<PromiseState<Output>.Action>

public enum Error: Swift.Error {
Expand All @@ -28,7 +29,10 @@ public struct PromiseReceiveState<Output: Sendable> {
{ channel in .init(channel: promiseChannel) }
}

static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void { }
static func complete(state: inout Self, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
guard let _ = await state.resolution.get() else { return }
fatalError("Promise completed without resolution")
}

static func dispose(action: Self.Action, completion: Reducer<Self, Self.Action>.Completion) async -> Void {
switch action {
Expand Down Expand Up @@ -60,7 +64,7 @@ public struct PromiseReceiveState<Output: Sendable> {
mutating func reduce(action: Action) async throws -> Reducer<Self, Action>.Effect {
var result: Result<Output, Swift.Error>! = .none
var resumption: Resumption<Int>! = .none
if case let .receive(r1, r3) = action {
if case let .receive(r1, r3) = action {
result = r1
resumption = r3
} else if case let .nonBlockingReceive(r2) = action {
Expand Down
19 changes: 9 additions & 10 deletions Sources/FreeCombine/Promise/Promise.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ extension Promise {
func subscribe(
_ downstream: @escaping @Sendable (Result<Output, Swift.Error>) async throws -> Void
) async throws -> Cancellable<Void> {
try await withResumption { resumption in
if let resolution = await resolution.get() {
return .init { try await downstream(resolution) }
}
return try await withResumption { resumption in
let queueStatus = stateTask.send(.subscribe(downstream, resumption))
switch queueStatus {
case .enqueued:
Expand Down Expand Up @@ -149,9 +152,7 @@ public extension Promise {
}
}

func receive(
_ result: Result<Output, Swift.Error>
) async throws -> Void {
@Sendable func send(_ result: Result<Output, Swift.Error>) async throws -> Void {
try await resolution.swapIfNone(result)
let queueStatus = receiveStateTask.send(.nonBlockingReceive(result))
switch queueStatus {
Expand All @@ -162,23 +163,21 @@ public extension Promise {
throw PublisherError.completed
case .dropped:
await resolution.set(value: .failure(PublisherError.enqueueError))
receiveStateTask.finish()
throw PublisherError.enqueueError
@unknown default:
await resolution.set(value: .failure(PublisherError.enqueueError))
stateTask.finish()
throw PublisherError.enqueueError
}
}

@Sendable func send(_ result: Result<Output, Swift.Error>) async throws -> Void {
try await receive(result)
}

@Sendable func succeed(_ value: Output) async throws -> Void {
try await receive(.success(value))
try await send(.success(value))
}

@Sendable func fail(_ error: Swift.Error) async throws -> Void {
try await receive(.failure(error))
try await send(.failure(error))
}

@Sendable func finish() -> Void {
Expand Down
97 changes: 97 additions & 0 deletions Tests/FreeCombineTests/PromiseTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,101 @@ final class PromiseTests: XCTestCase {
promise.finish()
_ = await promise.result
}

func testMultipleSubscribers() async throws {
let promise = try await Promise<Int>()
let max = 1_000
let range = 0 ..< max

var pairs: [(Expectation<Void>, Cancellable<Void>)] = .init()
for _ in range {
let expectation = await Expectation<Void>()
let cancellation = await promise.future()
.map { $0 * 2 }
.sink ({ result in
do { try await expectation.complete() }
catch { XCTFail("Failed to complete with error: \(error)") }
switch result {
case .success(let value): XCTAssert(value == 26, "Wrong value")
case .failure(let error): XCTFail("Failed with \(error)")
}
})
let pair = (expectation, cancellation)
pairs.append(pair)
}
XCTAssertTrue(pairs.count == max, "Failed to create futures")
try await promise.succeed(13)

do {
for pair in pairs {
try await FreeCombine.wait(for: pair.0, timeout: 10_000_000)
_ = await pair.1.result
}
} catch {
XCTFail("Timed out")
}

promise.finish()
_ = await promise.result
}

func testMultipleSends() async throws {
let promise = try await Promise<Int>()
let max = 1_000
let range = 0 ..< max

var pairs: [(Expectation<Void>, Cancellable<Void>)] = .init()
for _ in range {
let expectation = await Expectation<Void>()
let cancellation = await promise.future()
.map { $0 * 2 }
.sink ({ result in
do { try await expectation.complete() }
catch { XCTFail("Failed to complete with error: \(error)") }
switch result {
case .success(let value): XCTAssert(value == 26, "Wrong value")
case .failure(let error): XCTFail("Failed with \(error)")
}
})
let pair = (expectation, cancellation)
pairs.append(pair)
}
let succeedCounter = Counter()
let failureCounter = Counter()
XCTAssertTrue(pairs.count == max, "Failed to create futures")
let maxAttempts = 100
let _: Void = try await withResumption { resumption in
let semaphore: FreeCombine.Semaphore<Void, Void> = .init(
resumption: resumption,
reducer: { _, _ in return },
initialState: (),
count: maxAttempts
)
for _ in 0 ..< maxAttempts {
Task {
do { try await promise.succeed(13); await succeedCounter.increment(); await semaphore.decrement(with: ()) }
catch { await failureCounter.increment(); await semaphore.decrement(with: ()) }
}
}
}
let successCount = await succeedCounter.count
XCTAssert(successCount == 1, "Too many successes")

let failureCount = await failureCounter.count
XCTAssert(failureCount == maxAttempts - 1, "Too few failures")

do {
for pair in pairs {
try await FreeCombine.wait(for: pair.0, timeout: 10_000_000)
_ = await pair.1.result
}
} catch {
XCTFail("Timed out")
}

promise.finish()
_ = await promise.result
}


}

0 comments on commit 30239ec

Please sign in to comment.