Skip to content

Commit

Permalink
Remove eventLoop from execute commands
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-fowler committed Jun 18, 2023
1 parent 8aaaebc commit d3cd795
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 84 deletions.
48 changes: 12 additions & 36 deletions Sources/SotoCore/AWSClient+EndpointDiscovery+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,20 @@ extension AWSClient {
httpMethod: HTTPMethod,
serviceConfig: AWSServiceConfig,
endpointDiscovery: AWSEndpointDiscovery,
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
logger: Logger = AWSClient.loggingDisabled
) async throws {
let eventLoop = eventLoop ?? eventLoopGroup.next()
return try await self.execute(
execute: { endpoint in
return try await self.execute(
operation: operationName,
path: path,
httpMethod: httpMethod,
serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig,
logger: logger,
on: eventLoop
logger: logger
)
},
isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery),
endpointDiscovery: endpointDiscovery,
eventLoop: eventLoop,
logger: logger
)
}
Expand All @@ -73,10 +69,8 @@ extension AWSClient {
input: Input,
hostPrefix: String? = nil,
endpointDiscovery: AWSEndpointDiscovery,
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
logger: Logger = AWSClient.loggingDisabled
) async throws {
let eventLoop = eventLoop ?? eventLoopGroup.next()
return try await self.execute(
execute: { endpoint in
return try await self.execute(
Expand All @@ -86,13 +80,11 @@ extension AWSClient {
serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig,
input: input,
hostPrefix: hostPrefix,
logger: logger,
on: eventLoop
logger: logger
)
},
isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery),
endpointDiscovery: endpointDiscovery,
eventLoop: eventLoop,
logger: logger
)
}
Expand All @@ -114,24 +106,20 @@ extension AWSClient {
httpMethod: HTTPMethod,
serviceConfig: AWSServiceConfig,
endpointDiscovery: AWSEndpointDiscovery,
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
logger: Logger = AWSClient.loggingDisabled
) async throws -> Output {
let eventLoop = eventLoop ?? eventLoopGroup.next()
return try await self.execute(
execute: { endpoint in
return try await self.execute(
operation: operationName,
path: path,
httpMethod: httpMethod,
serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig,
logger: logger,
on: eventLoop
logger: logger
)
},
isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery),
endpointDiscovery: endpointDiscovery,
eventLoop: eventLoop,
logger: logger
)
}
Expand All @@ -157,10 +145,8 @@ extension AWSClient {
input: Input,
hostPrefix: String? = nil,
endpointDiscovery: AWSEndpointDiscovery,
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil
logger: Logger = AWSClient.loggingDisabled
) async throws -> Output {
let eventLoop = eventLoop ?? eventLoopGroup.next()
return try await self.execute(
execute: { endpoint in
return try await self.execute(
Expand All @@ -170,13 +156,11 @@ extension AWSClient {
serviceConfig: endpoint.map { serviceConfig.with(patch: .init(endpoint: $0)) } ?? serviceConfig,
input: input,
hostPrefix: hostPrefix,
logger: logger,
on: eventLoop
logger: logger
)
},
isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery),
endpointDiscovery: endpointDiscovery,
eventLoop: eventLoop,
logger: logger
)
}
Expand Down Expand Up @@ -204,10 +188,8 @@ extension AWSClient {
hostPrefix: String? = nil,
endpointDiscovery: AWSEndpointDiscovery,
logger: Logger = AWSClient.loggingDisabled,
on eventLoop: EventLoop? = nil,
stream: @escaping AWSResponseStream
) async throws -> Output {
let eventLoop = eventLoop ?? eventLoopGroup.next()
return try await self.execute(
execute: { endpoint in
return try await self.execute(
Expand All @@ -218,13 +200,11 @@ extension AWSClient {
input: input,
hostPrefix: hostPrefix,
logger: logger,
on: eventLoop,
stream: stream
)
},
isEnabled: serviceConfig.options.contains(.enableEndpointDiscovery),
endpointDiscovery: endpointDiscovery,
eventLoop: eventLoop,
logger: logger
)
}
Expand All @@ -233,21 +213,17 @@ extension AWSClient {
execute: @escaping (String?) async throws -> Output,
isEnabled: Bool,
endpointDiscovery: AWSEndpointDiscovery,
eventLoop: EventLoop,
logger: Logger
) async throws -> Output {
guard isEnabled || endpointDiscovery.isRequired else { return try await execute(nil) }
// get endpoint
if endpointDiscovery.isExpiring(within: 3 * 60) {
do {
let endpointTask = Task { () -> String in
logger.trace("Request endpoint")
let endpoint = try await endpointDiscovery.getEndpoint(logger: logger, on: eventLoop).get()
logger.trace("Received endpoint \(endpoint)")
return endpoint
}
logger.trace("Request endpoint")
let endpoint = try await endpointDiscovery.getEndpoint(logger: logger, on: self.eventLoopGroup.any()).get()
logger.trace("Received endpoint \(endpoint)")

if endpointDiscovery.isRequired {
let endpoint = try await endpointTask.value
return try await execute(endpoint)
} else {
return try await execute(nil)
Expand Down
Loading

0 comments on commit d3cd795

Please sign in to comment.