Skip to content

Commit

Permalink
Remove EventLoop version of waiter APIs (#551)
Browse files Browse the repository at this point in the history
* Remove EventLoop version of waiters

* Speed up waiter tests
  • Loading branch information
adam-fowler committed Jul 5, 2023
1 parent 61a01fd commit 60a9df1
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 96 deletions.
50 changes: 49 additions & 1 deletion Sources/SotoCore/Waiters/AWSClient+Waiter+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,54 @@ extension AWSClient {
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
) async throws {
return try await self.waitUntil(input, waiter: waiter, maxWaitTime: maxWaitTime, logger: logger, on: eventLoop).get()
let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime
let deadline: NIODeadline = .now() + maxWaitTime
let eventLoop = eventLoop ?? eventLoopGroup.next()

var attempt = 0
while true {
attempt += 1
let result: Result<Output, Error>
do {
result = try .success(await waiter.command(input, logger, eventLoop).get())
} catch {
result = .failure(error)
}
var acceptorState: WaiterState?
for acceptor in waiter.acceptors {
if acceptor.matcher.match(result: result.map { $0 }) {
acceptorState = acceptor.state
break
}
}
// if state has not been set then set it based on return of API call
let waiterState: WaiterState
if let state = acceptorState {
waiterState = state
} else if case .failure = result {
waiterState = .failure
} else {
waiterState = .retry
}
// based on state succeed, fail promise or retry
switch waiterState {
case .success:
return
case .failure:
if case .failure(let error) = result {
throw error
} else {
throw ClientError.waiterFailed
}
case .retry:
let wait = waiter.calculateRetryWaitTime(attempt: attempt, remainingTime: deadline - .now())
if wait < .seconds(0) {
throw ClientError.waiterTimeout
} else {
logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms")
try await Task.sleep(nanoseconds: UInt64(wait.nanoseconds))
}
}
}
}
}
88 changes: 10 additions & 78 deletions Sources/SotoCore/Waiters/AWSClient+Waiter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import NIOCore

extension AWSClient {
/// Waiter state
public enum WaiterState {
public enum WaiterState: Sendable {
case success
case retry
case failure
}

/// A waiter is a client side abstraction used to poll a resource until a desired state is reached
public struct Waiter<Input: Sendable, Output: Sendable> {
public struct Waiter<Input: Sendable, Output: Sendable>: Sendable {
/// An acceptor checks the result of a call and can change the waiter state based on that result
public struct Acceptor {
public struct Acceptor: Sendable {
public init(state: AWSClient.WaiterState, matcher: AWSWaiterMatcher) {
self.state = state
self.matcher = matcher
Expand All @@ -39,7 +39,7 @@ extension AWSClient {
let matcher: AWSWaiterMatcher
}

public typealias WaiterCommand = (Input, Logger, EventLoop?) -> EventLoopFuture<Output>
public typealias WaiterCommand = @Sendable (Input, Logger, EventLoop?) -> EventLoopFuture<Output>

/// Initialize an waiter
/// - Parameters:
Expand All @@ -66,7 +66,13 @@ extension AWSClient {

/// Calculate delay until next API call. This calculation comes from the AWS Smithy documentation
/// https://awslabs.github.io/smithy/1.0/spec/waiters.html#waiter-retries
///
/// - Parameters:
/// - attempt: Attempt number (assumes this starts at 1)
/// - remainingTime: Remaining time available
/// - Returns: Calculate retry time
func calculateRetryWaitTime(attempt: Int, remainingTime: TimeAmount) -> TimeAmount {
assert(attempt >= 1, "Attempt number cannot be less than 1")
let minDelay = Double(self.minDelayTime.nanoseconds) / 1_000_000_000
let maxDelay = Double(self.maxDelayTime.nanoseconds) / 1_000_000_000
let attemptCeiling = (log(maxDelay / minDelay) / log(2)) + 1
Expand All @@ -85,78 +91,4 @@ extension AWSClient {
return timeDelay
}
}

/// Returns an `EventLoopFuture` that will by fulfilled once waiter polling returns a success state
/// or returns an error if the polling returns an error or timesout
///
/// - Parameters:
/// - input: Input parameters
/// - waiter: Waiter to wait on
/// - maxWaitTime: Maximum amount of time to wait
/// - logger: Logger used to provide output
/// - eventLoop: EventLoop to run API calls on
/// - Returns: EventLoopFuture that will be fulfilled once waiter has completed
public func waitUntil<Input, Output>(
_ input: Input,
waiter: Waiter<Input, Output>,
maxWaitTime: TimeAmount? = nil,
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
) -> EventLoopFuture<Void> {
let maxWaitTime = maxWaitTime ?? waiter.maxDelayTime
let deadline: NIODeadline = .now() + maxWaitTime
let eventLoop = eventLoop ?? eventLoopGroup.next()
let promise = eventLoop.makePromise(of: Void.self)

func attempt(number: Int) {
waiter.command(input, logger, eventLoop)
.whenComplete { result in
var acceptorState: WaiterState?
for acceptor in waiter.acceptors {
if acceptor.matcher.match(result: result.map { $0 }) {
acceptorState = acceptor.state
break
}
}
// if state has not been set then set it based on return of API call
let waiterState: WaiterState
if let state = acceptorState {
waiterState = state
} else if case .failure = result {
waiterState = .failure
} else {
waiterState = .retry
}
// based on state succeed, fail promise or retry
switch waiterState {
case .success:
promise.succeed(())
case .failure:
if case .failure(let error) = result {
promise.fail(error)
} else {
promise.fail(ClientError.waiterFailed)
}
case .retry:
let wait = waiter.calculateRetryWaitTime(attempt: number, remainingTime: deadline - .now())
if wait < .seconds(0) {
promise.fail(ClientError.waiterTimeout)
} else {
logger.trace("Wait \(wait.nanoseconds / 1_000_000)ms")
eventLoop.scheduleTask(in: wait) { attempt(number: number + 1) }
}
}
}
}
attempt(number: 1)
return promise.futureResult
}
}

extension AWSClient.WaiterState: Sendable {}
extension AWSClient.Waiter.Acceptor: Sendable {}
// I could require the Waiter.command to be Sendable, but it just generates
// pain elsewhere where I have to mark all the API functions to be @Sendable
// which then requires multiple versions of those function if I am going to
// support backwards compatiblity
extension AWSClient.Waiter: @unchecked Sendable {}
34 changes: 17 additions & 17 deletions Tests/SotoCoreTests/WaiterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class WaiterTests: XCTestCase {
let i: Int
}

func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<Output> {
@Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<Output> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}

Expand All @@ -59,7 +59,7 @@ class WaiterTests: XCTestCase {
let array: [Element]
}

func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
@Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}

Expand All @@ -74,7 +74,7 @@ class WaiterTests: XCTestCase {
let array: [Element]?
}

func optionalArrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<OptionalArrayOutput> {
@Sendable func optionalArrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<OptionalArrayOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}

Expand All @@ -83,7 +83,7 @@ class WaiterTests: XCTestCase {
acceptors: [
.init(state: .success, matcher: try! JMESPathMatcher("array[*].status", expected: [true, true, true])),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: self.arrayOperation
)
let input = Input()
Expand All @@ -102,14 +102,14 @@ class WaiterTests: XCTestCase {
struct StringOutput: AWSDecodableShape & Encodable {
let s: String
}
func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<StringOutput> {
@Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<StringOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}
let waiter = AWSClient.Waiter(
acceptors: [
.init(state: .success, matcher: try! JMESPathMatcher("s", expected: "yes")),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: operation
)
let input = Input()
Expand Down Expand Up @@ -137,14 +137,14 @@ class WaiterTests: XCTestCase {
struct EnumOutput: AWSDecodableShape & Encodable {
let e: YesNo
}
func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<EnumOutput> {
@Sendable func operation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<EnumOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: self.config, input: input, logger: logger, on: eventLoop)
}
let waiter = AWSClient.Waiter(
acceptors: [
.init(state: .success, matcher: try! JMESPathMatcher("e", expected: "YES")),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: operation
)
let input = Input()
Expand All @@ -168,7 +168,7 @@ class WaiterTests: XCTestCase {
acceptors: [
.init(state: .success, matcher: try! JMESAnyPathMatcher("array[*].status", expected: true)),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: self.arrayOperation
)
let input = Input()
Expand All @@ -192,7 +192,7 @@ class WaiterTests: XCTestCase {
acceptors: [
.init(state: .success, matcher: try! JMESAllPathMatcher("array[*].status", expected: true)),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: self.arrayOperation
)
let input = Input()
Expand Down Expand Up @@ -222,15 +222,15 @@ class WaiterTests: XCTestCase {
defer { XCTAssertNoThrow(try awsServer.stop()) }
let config = createServiceConfig(serviceProtocol: .restxml, endpoint: awsServer.address)

func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
@Sendable func arrayOperation(input: Input, logger: Logger, eventLoop: EventLoop?) -> EventLoopFuture<ArrayOutput> {
self.client.execute(operation: "Basic", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: logger, on: eventLoop)
}

let waiter = AWSClient.Waiter(
acceptors: [
.init(state: .success, matcher: try! JMESPathMatcher("array[*]", expected: [true, true, true])),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: arrayOperation
)
let input = Input(test: "Input")
Expand All @@ -250,12 +250,12 @@ class WaiterTests: XCTestCase {
acceptors: [
.init(state: .success, matcher: try! JMESPathMatcher("i", expected: 3)),
],
minDelayTime: .seconds(2),
maxDelayTime: .seconds(4),
minDelayTime: .milliseconds(200),
maxDelayTime: .milliseconds(400),
command: self.operation
)
let input = Input()
async let responseTask: Void = self.client.waitUntil(input, waiter: waiter, maxWaitTime: .seconds(4), logger: TestEnvironment.logger)
async let responseTask: Void = self.client.waitUntil(input, waiter: waiter, maxWaitTime: .milliseconds(400), logger: TestEnvironment.logger)

var i = 0
XCTAssertNoThrow(try self.awsServer.process { (_: Input) -> AWSTestServer.Result<Output> in
Expand All @@ -277,7 +277,7 @@ class WaiterTests: XCTestCase {
.init(state: .retry, matcher: AWSErrorCodeMatcher("AccessDenied")),
.init(state: .success, matcher: try! JMESPathMatcher("i", expected: 3)),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: self.operation
)
let input = Input()
Expand All @@ -302,7 +302,7 @@ class WaiterTests: XCTestCase {
.init(state: .retry, matcher: AWSErrorStatusMatcher(404)),
.init(state: .success, matcher: try! JMESPathMatcher("i", expected: 3)),
],
minDelayTime: .seconds(2),
minDelayTime: .milliseconds(2),
command: self.operation
)
let input = Input()
Expand Down

0 comments on commit 60a9df1

Please sign in to comment.