diff --git a/Sources/SotoCore/AWSClient+EndpointDiscovery+async.swift b/Sources/SotoCore/AWSClient+EndpointDiscovery+async.swift index 158003822..e7f31d7ec 100644 --- a/Sources/SotoCore/AWSClient+EndpointDiscovery+async.swift +++ b/Sources/SotoCore/AWSClient+EndpointDiscovery+async.swift @@ -32,10 +32,8 @@ extension AWSClient { httpMethod: HTTPMethod, serviceConfig: AWSServiceConfig, endpointDiscovery: AWSEndpointDiscovery, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws { - let eventLoop = eventLoop ?? eventLoopGroup.next() return try await self.execute( execute: { endpoint in return try await self.execute( @@ -43,13 +41,11 @@ extension AWSClient { path: path, httpMethod: httpMethod, serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) }, isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery), endpointDiscovery: endpointDiscovery, - eventLoop: eventLoop, logger: logger ) } @@ -73,10 +69,8 @@ extension AWSClient { input: Input, hostPrefix: String? = nil, endpointDiscovery: AWSEndpointDiscovery, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws { - let eventLoop = eventLoop ?? eventLoopGroup.next() return try await self.execute( execute: { endpoint in return try await self.execute( @@ -86,13 +80,11 @@ extension AWSClient { serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig, input: input, hostPrefix: hostPrefix, - logger: logger, - on: eventLoop + logger: logger ) }, isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery), endpointDiscovery: endpointDiscovery, - eventLoop: eventLoop, logger: logger ) } @@ -114,10 +106,8 @@ extension AWSClient { httpMethod: HTTPMethod, serviceConfig: AWSServiceConfig, endpointDiscovery: AWSEndpointDiscovery, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws -> Output { - let eventLoop = eventLoop ?? eventLoopGroup.next() return try await self.execute( execute: { endpoint in return try await self.execute( @@ -125,13 +115,11 @@ extension AWSClient { path: path, httpMethod: httpMethod, serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) }, isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery), endpointDiscovery: endpointDiscovery, - eventLoop: eventLoop, logger: logger ) } @@ -157,10 +145,8 @@ extension AWSClient { input: Input, hostPrefix: String? = nil, endpointDiscovery: AWSEndpointDiscovery, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws -> Output { - let eventLoop = eventLoop ?? eventLoopGroup.next() return try await self.execute( execute: { endpoint in return try await self.execute( @@ -170,13 +156,11 @@ extension AWSClient { serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig, input: input, hostPrefix: hostPrefix, - logger: logger, - on: eventLoop + logger: logger ) }, isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery), endpointDiscovery: endpointDiscovery, - eventLoop: eventLoop, logger: logger ) } @@ -204,10 +188,8 @@ extension AWSClient { hostPrefix: String? = nil, endpointDiscovery: AWSEndpointDiscovery, logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil, stream: @escaping AWSResponseStream ) async throws -> Output { - let eventLoop = eventLoop ?? eventLoopGroup.next() return try await self.execute( execute: { endpoint in return try await self.execute( @@ -218,13 +200,11 @@ extension AWSClient { input: input, hostPrefix: hostPrefix, logger: logger, - on: eventLoop, stream: stream ) }, isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery), endpointDiscovery: endpointDiscovery, - eventLoop: eventLoop, logger: logger ) } @@ -233,21 +213,17 @@ extension AWSClient { execute: @escaping (String?) async throws -> Output, isEnabled: Bool, endpointDiscovery: AWSEndpointDiscovery, - eventLoop: EventLoop, logger: Logger ) async throws -> Output { guard isEnabled || endpointDiscovery.isRequired else { return try await execute(nil) } // get endpoint if endpointDiscovery.isExpiring(within: 3 * 60) { do { - let endpointTask = Task { () -> String in - logger.trace("Request endpoint") - let endpoint = try await endpointDiscovery.getEndpoint(logger: logger, on: eventLoop).get() - logger.trace("Received endpoint \(endpoint)") - return endpoint - } + logger.trace("Request endpoint") + let endpoint = try await endpointDiscovery.getEndpoint(logger: logger, on: self.eventLoopGroup.any()).get() + logger.trace("Received endpoint \(endpoint)") + if endpointDiscovery.isRequired { - let endpoint = try await endpointTask.value return try await execute(endpoint) } else { return try await execute(nil) diff --git a/Sources/SotoCore/AWSClient+async.swift b/Sources/SotoCore/AWSClient+async.swift index 11c79d332..13a4886fc 100644 --- a/Sources/SotoCore/AWSClient+async.swift +++ b/Sources/SotoCore/AWSClient+async.swift @@ -60,7 +60,6 @@ extension AWSClient { /// - input: Input object /// - hostPrefix: String to prefix host name with /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on public func execute( operation operationName: String, path: String, @@ -68,8 +67,7 @@ extension AWSClient { serviceConfig: AWSServiceConfig, input: Input, hostPrefix: String? = nil, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws { return try await self.execute( operation: operationName, @@ -84,14 +82,13 @@ extension AWSClient { ) }, execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) }, processResponse: { _ in return }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } @@ -102,14 +99,12 @@ extension AWSClient { /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) /// - serviceConfig: AWS Service configuration /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on public func execute( operation operationName: String, path: String, httpMethod: HTTPMethod, serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws { return try await self.execute( operation: operationName, @@ -122,14 +117,13 @@ extension AWSClient { ) }, execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) }, processResponse: { _ in return }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } @@ -140,7 +134,6 @@ extension AWSClient { /// - httpMethod: HTTP method to use ("GET", "PUT", "PUSH" etc) /// - serviceConfig: AWS Service configuration /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on /// - returns: /// Output object that completes when response is received public func execute( @@ -148,8 +141,7 @@ extension AWSClient { path: String, httpMethod: HTTPMethod, serviceConfig: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws -> Output { return try await self.execute( operation: operationName, @@ -162,14 +154,13 @@ extension AWSClient { ) }, execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) }, processResponse: { response in return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } @@ -182,7 +173,6 @@ extension AWSClient { /// - input: Input object /// - hostPrefix: String to prefix host name with /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on /// - returns: /// Output object that completes when response is received public func execute( @@ -192,8 +182,7 @@ extension AWSClient { serviceConfig: AWSServiceConfig, input: Input, hostPrefix: String? = nil, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws -> Output { return try await self.execute( operation: operationName, @@ -208,14 +197,13 @@ extension AWSClient { ) }, execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger) }, processResponse: { response in return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } @@ -228,7 +216,6 @@ extension AWSClient { /// - input: Input object /// - hostPrefix: String to prefix host name with /// - logger: Logger to log request details to - /// - eventLoop: EventLoop to run request on /// - returns: /// Output object that completes when response is received public func execute( @@ -239,7 +226,6 @@ extension AWSClient { input: Input, hostPrefix: String? = nil, logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil, stream: @escaping AWSResponseStream ) async throws -> Output { return try await self.execute( @@ -255,14 +241,13 @@ extension AWSClient { ) }, execute: { request, eventLoop, logger in - return self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger, stream: stream) + return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: eventLoop, logger: logger, stream: stream) }, processResponse: { response in return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, - logger: logger, - on: eventLoop + logger: logger ) } @@ -270,18 +255,17 @@ extension AWSClient { internal func execute( operation operationName: String, createRequest: @escaping () throws -> AWSRequest, - execute: @escaping (AWSHTTPRequest, EventLoop, Logger) -> EventLoopFuture, + execute: @escaping (AWSHTTPRequest, EventLoop, Logger) async throws -> AWSHTTPResponse, processResponse: @escaping (AWSHTTPResponse) throws -> Output, config: AWSServiceConfig, - logger: Logger = AWSClient.loggingDisabled, - on eventLoop: EventLoop? = nil + logger: Logger = AWSClient.loggingDisabled ) async throws -> Output { - let eventLoop = eventLoop ?? eventLoopGroup.next() let logger = logger.attachingRequestId( Self.globalRequestID.wrappingIncrementThenLoad(ordering: .relaxed), operation: operationName, service: config.service ) + let eventLoop = self.eventLoopGroup.any() let dimensions: [(String, String)] = [("aws-service", config.service), ("aws-operation", operationName)] let startTime = DispatchTime.now().uptimeNanoseconds @@ -309,10 +293,10 @@ extension AWSClient { with: config, eventLoop: eventLoop, logger: logger, - request: { eventLoop in execute(awsRequest, eventLoop, logger) }, + request: { eventLoop in try await execute(awsRequest, eventLoop, logger) }, processResponse: processResponse, streaming: streaming - ).get() + ) logger.trace("AWS Response") Metrics.Timer( label: "aws_request_duration", @@ -333,6 +317,86 @@ extension AWSClient { } } + func invoke( + with serviceConfig: AWSServiceConfig, + eventLoop: EventLoop, + logger: Logger, + request: @escaping (EventLoop) async throws -> AWSHTTPResponse, + processResponse: @escaping (AWSHTTPResponse) throws -> Output, + streaming: Bool + ) async throws -> Output { + var attempt = 0 + while true { + do { + let response = try await request(eventLoop) + // if it returns an HTTP status code outside 2xx then throw an error + guard (200..<300).contains(response.status.code) else { + throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger) + } + let output = try processResponse(response) + return output + } catch { + // if streaming and the error returned is an AWS error fail immediately. Do not attempt + // to retry as the streaming function will not know you are retrying + if streaming, + error is AWSErrorType || error is AWSRawError + { + throw error + } + // If I get a retry wait time for this error then attempt to retry request + if case .retry(let retryTime) = self.retryPolicy.getRetryWaitTime(error: error, attempt: attempt) { + logger.trace("Retrying request", metadata: [ + "aws-retry-time": "\(Double(retryTime.nanoseconds) / 1_000_000_000)", + ]) + try await Task.sleep(nanoseconds: UInt64(retryTime.nanoseconds)) + } else { + throw error + } + } + attempt += 1 + } + } + + /* func execute(attempt: Int) { + // execute HTTP request + _ = request(eventLoop) + .flatMapThrowing { response throws -> Void in + // if it returns an HTTP status code outside 2xx then throw an error + guard (200..<300).contains(response.status.code) else { + throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger) + } + let output = try processResponse(response) + promise.succeed(output) + } + .flatMapErrorThrowing { error -> Void in + // if streaming and the error returned is an AWS error fail immediately. Do not attempt + // to retry as the streaming function will not know you are retrying + if streaming, + error is AWSErrorType || error is AWSRawError + { + promise.fail(error) + return + } + // If I get a retry wait time for this error then attempt to retry request + if case .retry(let retryTime) = self.retryPolicy.getRetryWaitTime(error: error, attempt: attempt) { + logger.trace("Retrying request", metadata: [ + "aws-retry-time": "\(Double(retryTime.nanoseconds) / 1_000_000_000)", + ]) + // schedule task for retrying AWS request + eventLoop.scheduleTask(in: retryTime) { + execute(attempt: attempt + 1) + } + } else { + promise.fail(error) + } + } + } + + execute(attempt: 0) + + return promise.futureResult + } + } */ /// Get credential used by client /// - Parameters: /// - eventLoop: optional eventLoop to run operation on diff --git a/Sources/SotoCore/HTTP/AsyncHTTPClient.swift b/Sources/SotoCore/HTTP/AsyncHTTPClient.swift index 2cad53310..b10b2807a 100644 --- a/Sources/SotoCore/HTTP/AsyncHTTPClient.swift +++ b/Sources/SotoCore/HTTP/AsyncHTTPClient.swift @@ -97,3 +97,31 @@ extension AsyncHTTPClient.HTTPClient { } extension AsyncHTTPClient.HTTPClient.Response: AWSHTTPResponse {} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncHTTPClient.HTTPClient { + /// Execute HTTP request + /// - Parameters: + /// - request: HTTP request + /// - timeout: If execution is idle for longer than timeout then throw error + /// - eventLoop: eventLoop to run request on + /// - Returns: EventLoopFuture that will be fulfilled with request response + func execute( + request: AWSHTTPRequest, + timeout: TimeAmount, + on eventLoop: EventLoop, + logger: Logger + ) async throws -> AWSHTTPResponse { + try await self.execute(request: request, timeout: timeout, on: eventLoop, logger: logger).get() + } + + func execute( + request: AWSHTTPRequest, + timeout: TimeAmount, + on eventLoop: EventLoop, + logger: Logger, + stream: @escaping AWSResponseStream + ) async throws -> AWSHTTPResponse { + try await self.execute(request: request, timeout: timeout, on: eventLoop, logger: logger, stream: stream).get() + } +} diff --git a/Tests/SotoCoreTests/AWSClientTests.swift b/Tests/SotoCoreTests/AWSClientTests.swift index edda569c4..fd3cbc684 100644 --- a/Tests/SotoCoreTests/AWSClientTests.swift +++ b/Tests/SotoCoreTests/AWSClientTests.swift @@ -728,14 +728,12 @@ class AWSClientTests: XCTestCase { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) XCTAssertNoThrow(try awsServer.stop()) } - let eventLoop = client.eventLoopGroup.next() async let responseTask: Void = client.execute( operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, - logger: TestEnvironment.logger, - on: eventLoop + logger: TestEnvironment.logger ) try awsServer.processRaw { _ in diff --git a/Tests/SotoCoreTests/EndpointDiscoveryTests.swift b/Tests/SotoCoreTests/EndpointDiscoveryTests.swift index 1d8b2cf7a..95df1fd8a 100644 --- a/Tests/SotoCoreTests/EndpointDiscoveryTests.swift +++ b/Tests/SotoCoreTests/EndpointDiscoveryTests.swift @@ -70,8 +70,7 @@ final class EndpointDiscoveryTests: XCTestCase { serviceConfig: self.config, input: input, endpointDiscovery: .init(storage: self.endpointStorage, discover: self.getEndpoints, required: true), - logger: logger, - on: eventLoop + logger: logger ) } @@ -89,8 +88,7 @@ final class EndpointDiscoveryTests: XCTestCase { httpMethod: .GET, serviceConfig: self.config, endpointDiscovery: .init(storage: self.endpointStorage, discover: self.getEndpointsDontCache, required: true), - logger: logger, - on: eventLoop + logger: logger ) } @@ -102,8 +100,7 @@ final class EndpointDiscoveryTests: XCTestCase { serviceConfig: self.config, input: input, endpointDiscovery: .init(storage: self.endpointStorage, discover: self.getEndpoints, required: false), - logger: logger, - on: eventLoop + logger: logger ) } } diff --git a/Tests/SotoCoreTests/PaginateTests.swift b/Tests/SotoCoreTests/PaginateTests.swift index bec58b055..61960288a 100644 --- a/Tests/SotoCoreTests/PaginateTests.swift +++ b/Tests/SotoCoreTests/PaginateTests.swift @@ -76,8 +76,7 @@ final class PaginateAsyncTests: XCTestCase, @unchecked Sendable { httpMethod: .POST, serviceConfig: self.config, input: input, - logger: logger, - on: eventLoop + logger: logger ) } @@ -98,8 +97,7 @@ final class PaginateAsyncTests: XCTestCase, @unchecked Sendable { httpMethod: .POST, serviceConfig: self.config, input: input, - logger: logger, - on: eventLoop + logger: logger ) }