Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Detached tasks #334

Merged
merged 14 commits into from
Aug 23, 2024
95 changes: 95 additions & 0 deletions Sources/AWSLambdaRuntimeCore/DetachedTasks.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Foundation
import NIOConcurrencyHelpers
import NIOCore
import Logging

/// A container that allows tasks to finish after a synchronous invocation
/// has produced its response.
actor DetachedTasksContainer: Sendable {

struct Context: Sendable {
let eventLoop: EventLoop
let logger: Logger
}

private var context: Context
private var storage: [RegistrationKey: EventLoopFuture<Void>] = [:]

init(context: Context) {
self.context = context
}

/// Adds a detached async task.
///
/// - Parameters:
/// - name: The name of the task.
/// - task: The async task to execute.
/// - Returns: A `RegistrationKey` for the registered task.
func detached(task: @Sendable @escaping () async -> Void) {
let key = RegistrationKey()
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask(task)
let task = promise.futureResult.always { [weak self] result in
guard let self else { return }
Task {
await self.removeTask(forKey: key)
}
}
self.storage[key] = task
}

func removeTask(forKey key: RegistrationKey) {
self.storage.removeValue(forKey: key)
}

/// Awaits all registered tasks to complete.
///
/// - Returns: An `EventLoopFuture<Void>` that completes when all tasks have finished.
func awaitAll() -> EventLoopFuture<Void> {
let tasks = storage.values
if tasks.isEmpty {
return context.eventLoop.makeSucceededVoidFuture()
} else {
let context = context
return EventLoopFuture.andAllComplete(Array(tasks), on: context.eventLoop).flatMap { [weak self] in
guard let self else {
return context.eventLoop.makeSucceededFuture(())
}
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await self.awaitAll().get()
}
return promise.futureResult
}
}
}
}

extension DetachedTasksContainer {
/// Lambda detached task registration key.
struct RegistrationKey: Hashable, CustomStringConvertible, Sendable {
var value: String

init() {
// UUID basically
self.value = UUID().uuidString
}

var description: String {
self.value
}
}
}
30 changes: 28 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
let logger: Logger
let eventLoop: EventLoop
let allocator: ByteBufferAllocator
let tasks: DetachedTasksContainer

init(
requestID: String,
Expand All @@ -91,7 +92,8 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
clientContext: String?,
logger: Logger,
eventLoop: EventLoop,
allocator: ByteBufferAllocator
allocator: ByteBufferAllocator,
tasks: DetachedTasksContainer
) {
self.requestID = requestID
self.traceID = traceID
Expand All @@ -102,6 +104,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
self.logger = logger
self.eventLoop = eventLoop
self.allocator = allocator
self.tasks = tasks
}
}

Expand Down Expand Up @@ -177,7 +180,13 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
clientContext: clientContext,
logger: logger,
eventLoop: eventLoop,
allocator: allocator
allocator: allocator,
tasks: DetachedTasksContainer(
context: DetachedTasksContainer.Context(
eventLoop: eventLoop,
logger: logger
)
)
)
}

Expand All @@ -188,6 +197,23 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
let remaining = deadline - now
return .milliseconds(remaining)
}

var tasks: DetachedTasksContainer {
self.storage.tasks
}


/// Registers a background task that continues running after the synchronous invocation has completed.
/// This is useful for tasks like flushing metrics or performing clean-up operations without delaying the response.
///
/// - Parameter body: An asynchronous closure that performs the background task.
/// - Warning: You will be billed for the milliseconds of Lambda execution time until the very last
/// background task is finished.
public func detachedBackgroundTask(_ body: @escaping @Sendable () async -> ()) {
Task {
await self.tasks.detached(task: body)
}
}

public var debugDescription: String {
"\(Self.self)(requestID: \(self.requestID), traceID: \(self.traceID), invokedFunctionARN: \(self.invokedFunctionARN), cognitoIdentity: \(self.cognitoIdentity ?? "nil"), clientContext: \(self.clientContext ?? "nil"), deadline: \(self.deadline))"
Expand Down
18 changes: 16 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,27 @@ internal final class LambdaRunner {
if case .failure(let error) = result {
logger.warning("lambda handler returned an error: \(error)")
}
return (invocation, result)
return (invocation, result, context)
}
}.flatMap { invocation, result in
}.flatMap { invocation, result, context in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("could not report results to lambda runtime engine: \(error)")
// To discuss:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only point that remains open @fabianfett. When the runtime fails to report a result to AWS Lambda, do we want to wait for the background tasks to complete before stopping the execution of the whole process?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the error case we should await all subtasks.

// Do we want to await the tasks in this case?
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
return try await context.tasks.awaitAll().get()
}
return promise.futureResult
}.map { _ in context }
}
.flatMap { (context: LambdaContext) -> EventLoopFuture<Void> in
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask {
try await context.tasks.awaitAll().get()
}
return promise.futureResult
}
}

Expand Down
80 changes: 80 additions & 0 deletions Tests/AWSLambdaRuntimeCoreTests/DetachedTasksTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import AWSLambdaRuntimeCore
import NIO
import XCTest
import Logging

class DetachedTasksTest: XCTestCase {

actor Expectation {
var isFulfilled = false
func fulfill() {
isFulfilled = true
}
}

func testAwaitTasks() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let context = DetachedTasksContainer.Context(
eventLoop: eventLoopGroup.next(),
logger: Logger(label: "test")
)
let expectation = Expectation()

let container = DetachedTasksContainer(context: context)
await container.detached {
try! await Task.sleep(for: .milliseconds(200))
await expectation.fulfill()
}

try await container.awaitAll().get()
let isFulfilled = await expectation.isFulfilled
XCTAssert(isFulfilled)
}

func testAwaitChildrenTasks() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let context = DetachedTasksContainer.Context(
eventLoop: eventLoopGroup.next(),
logger: Logger(label: "test")
)
let expectation1 = Expectation()
let expectation2 = Expectation()

let container = DetachedTasksContainer(context: context)
await container.detached {
await container.detached {
try! await Task.sleep(for: .milliseconds(300))
await expectation1.fulfill()
}
try! await Task.sleep(for: .milliseconds(200))
await container.detached {
try! await Task.sleep(for: .milliseconds(100))
await expectation2.fulfill()
}
}

try await container.awaitAll().get()
let isFulfilled1 = await expectation1.isFulfilled
let isFulfilled2 = await expectation2.isFulfilled
XCTAssert(isFulfilled1)
XCTAssert(isFulfilled2)
}
}
21 changes: 20 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public protocol SimpleLambdaHandler {

### Context

When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging and allocating buffers.
When calling the user provided Lambda function, the library provides a `LambdaContext` class that provides metadata about the execution context, as well as utilities for logging, allocating buffers and dispatch background tasks.

```swift
public struct LambdaContext: CustomDebugStringConvertible, Sendable {
Expand Down Expand Up @@ -555,6 +555,25 @@ public struct LambdaInitializationContext: Sendable {
}
```

### Background tasks

The detachedBackgroundTask method allows you to register background tasks that continue running even after the Lambda runtime has reported the result of a synchronous invocation. This is particularly useful for integrations with services like API Gateway or CloudFront, where you can quickly return a response without waiting for non-essential tasks such as flushing metrics or performing non-critical clean-up operations.

```swift
@main
struct MyLambda: SimpleLambdaHandler {
func handle(_ request: APIGatewayV2Request, context: LambdaContext) async throws -> APIGatewayV2Response {
let response = makeResponse()
context.detachedBackgroundTask {
try? await Task.sleep(for: .seconds(3))
print("Background task completed")
}
print("Returning response")
return response
}
}
```

### Configuration

The library’s behavior can be fine tuned using environment variables based configuration. The library supported the following environment variables:
Expand Down