diff --git a/Sources/AWSLambdaRuntimeCore/DetachedTasks.swift b/Sources/AWSLambdaRuntimeCore/DetachedTasks.swift new file mode 100644 index 00000000..f06750bf --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/DetachedTasks.swift @@ -0,0 +1,95 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import Foundation +import NIOConcurrencyHelpers +import NIOCore +import Logging + +/// A container that allows tasks to finish after a synchronous invocation +/// has produced its response. +actor DetachedTasksContainer: Sendable { + + struct Context: Sendable { + let eventLoop: EventLoop + let logger: Logger + } + + private var context: Context + private var storage: [RegistrationKey: EventLoopFuture] = [:] + + init(context: Context) { + self.context = context + } + + /// Adds a detached async task. + /// + /// - Parameters: + /// - name: The name of the task. + /// - task: The async task to execute. + /// - Returns: A `RegistrationKey` for the registered task. + func detached(task: @Sendable @escaping () async -> Void) { + let key = RegistrationKey() + let promise = context.eventLoop.makePromise(of: Void.self) + promise.completeWithTask(task) + let task = promise.futureResult.always { [weak self] result in + guard let self else { return } + Task { + await self.removeTask(forKey: key) + } + } + self.storage[key] = task + } + + func removeTask(forKey key: RegistrationKey) { + self.storage.removeValue(forKey: key) + } + + /// Awaits all registered tasks to complete. + /// + /// - Returns: An `EventLoopFuture` that completes when all tasks have finished. + func awaitAll() -> EventLoopFuture { + let tasks = storage.values + if tasks.isEmpty { + return context.eventLoop.makeSucceededVoidFuture() + } else { + let context = context + return EventLoopFuture.andAllComplete(Array(tasks), on: context.eventLoop).flatMap { [weak self] in + guard let self else { + return context.eventLoop.makeSucceededFuture(()) + } + let promise = context.eventLoop.makePromise(of: Void.self) + promise.completeWithTask { + try await self.awaitAll().get() + } + return promise.futureResult + } + } + } +} + +extension DetachedTasksContainer { + /// Lambda detached task registration key. + struct RegistrationKey: Hashable, CustomStringConvertible, Sendable { + var value: String + + init() { + // UUID basically + self.value = UUID().uuidString + } + + var description: String { + self.value + } + } +} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index 24e960a4..943c4e5f 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -81,6 +81,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable { let logger: Logger let eventLoop: EventLoop let allocator: ByteBufferAllocator + let tasks: DetachedTasksContainer init( requestID: String, @@ -91,7 +92,8 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable { clientContext: String?, logger: Logger, eventLoop: EventLoop, - allocator: ByteBufferAllocator + allocator: ByteBufferAllocator, + tasks: DetachedTasksContainer ) { self.requestID = requestID self.traceID = traceID @@ -102,6 +104,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable { self.logger = logger self.eventLoop = eventLoop self.allocator = allocator + self.tasks = tasks } } @@ -177,7 +180,13 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable { clientContext: clientContext, logger: logger, eventLoop: eventLoop, - allocator: allocator + allocator: allocator, + tasks: DetachedTasksContainer( + context: DetachedTasksContainer.Context( + eventLoop: eventLoop, + logger: logger + ) + ) ) } @@ -188,6 +197,23 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable { let remaining = deadline - now return .milliseconds(remaining) } + + var tasks: DetachedTasksContainer { + self.storage.tasks + } + + + /// Registers a background task that continues running after the synchronous invocation has completed. + /// This is useful for tasks like flushing metrics or performing clean-up operations without delaying the response. + /// + /// - Parameter body: An asynchronous closure that performs the background task. + /// - Warning: You will be billed for the milliseconds of Lambda execution time until the very last + /// background task is finished. + public func detachedBackgroundTask(_ body: @escaping @Sendable () async -> ()) { + Task { + await self.tasks.detached(task: body) + } + } public var debugDescription: String { "\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))" diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 23281b94..87457f43 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -95,13 +95,27 @@ internal final class LambdaRunner { if case .failure(let error) = result { logger.warning("lambda handler returned an error: \(error)") } - return (invocation, result) + return (invocation, result, context) } - }.flatMap { invocation, result in + }.flatMap { invocation, result, context in // 3. report results to runtime engine self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in logger.error("could not report results to lambda runtime engine: \(error)") + // To discuss: + // Do we want to await the tasks in this case? + let promise = context.eventLoop.makePromise(of: Void.self) + promise.completeWithTask { + return try await context.tasks.awaitAll().get() + } + return promise.futureResult + }.map { _ in context } + } + .flatMap { (context: LambdaContext) -> EventLoopFuture in + let promise = context.eventLoop.makePromise(of: Void.self) + promise.completeWithTask { + try await context.tasks.awaitAll().get() } + return promise.futureResult } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/DetachedTasksTests.swift b/Tests/AWSLambdaRuntimeCoreTests/DetachedTasksTests.swift new file mode 100644 index 00000000..b0ec42bc --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/DetachedTasksTests.swift @@ -0,0 +1,80 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import NIO +import XCTest +import Logging + +class DetachedTasksTest: XCTestCase { + + actor Expectation { + var isFulfilled = false + func fulfill() { + isFulfilled = true + } + } + + func testAwaitTasks() async throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let context = DetachedTasksContainer.Context( + eventLoop: eventLoopGroup.next(), + logger: Logger(label: "test") + ) + let expectation = Expectation() + + let container = DetachedTasksContainer(context: context) + await container.detached { + try! await Task.sleep(for: .milliseconds(200)) + await expectation.fulfill() + } + + try await container.awaitAll().get() + let isFulfilled = await expectation.isFulfilled + XCTAssert(isFulfilled) + } + + func testAwaitChildrenTasks() async throws { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let context = DetachedTasksContainer.Context( + eventLoop: eventLoopGroup.next(), + logger: Logger(label: "test") + ) + let expectation1 = Expectation() + let expectation2 = Expectation() + + let container = DetachedTasksContainer(context: context) + await container.detached { + await container.detached { + try! await Task.sleep(for: .milliseconds(300)) + await expectation1.fulfill() + } + try! await Task.sleep(for: .milliseconds(200)) + await container.detached { + try! await Task.sleep(for: .milliseconds(100)) + await expectation2.fulfill() + } + } + + try await container.awaitAll().get() + let isFulfilled1 = await expectation1.isFulfilled + let isFulfilled2 = await expectation2.isFulfilled + XCTAssert(isFulfilled1) + XCTAssert(isFulfilled2) + } +} diff --git a/readme.md b/readme.md index 8a4111e6..a21a55eb 100644 --- a/readme.md +++ b/readme.md @@ -474,7 +474,7 @@ public protocol SimpleLambdaHandler { ### Context -When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging and allocating buffers. +When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging, allocating buffers and dispatch background tasks. ```swift public struct LambdaContext: CustomDebugStringConvertible, Sendable { @@ -555,6 +555,25 @@ public struct LambdaInitializationContext: Sendable { } ``` +### Background tasks + +The detachedBackgroundTask method allows you to register background tasks that continue running even after the Lambda runtime has reported the result of a synchronous invocation. This is particularly useful for integrations with services like API Gateway or CloudFront, where you can quickly return a response without waiting for non-essential tasks such as flushing metrics or performing non-critical clean-up operations. + +```swift + @main + struct MyLambda: SimpleLambdaHandler { + func handle(_ request: APIGatewayV2Request, context: LambdaContext) async throws -> APIGatewayV2Response { + let response = makeResponse() + context.detachedBackgroundTask { + try? await Task.sleep(for: .seconds(3)) + print("Background task completed") + } + print("Returning response") + return response + } + } +``` + ### Configuration The library’s behavior can be fine tuned using environment variables based configuration. The library supported the following environment variables: