diff --git a/Sources/AWSLambdaRuntime/Lambda+Codable.swift b/Sources/AWSLambdaRuntime/Lambda+Codable.swift index c345e553..1f59b8f7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+Codable.swift +++ b/Sources/AWSLambdaRuntime/Lambda+Codable.swift @@ -65,3 +65,56 @@ extension LambdaCodableAdapter { ) } } + +extension NewLambdaRuntime { + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. ``JSONEncoder()`` used as default. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default. + package convenience init( + body: @escaping (Event, NewLambdaContext) async throws -> Output, + encoder: JSONEncoder = JSONEncoder(), + decoder: JSONDecoder = JSONDecoder() + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Output, + JSONDecoder, + LambdaJSONOutputEncoder + > + { + let handler = LambdaCodableAdapter( + encoder: encoder, + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. ``JSONDecoder()`` used as default. + package convenience init( + body: @escaping (Event, NewLambdaContext) async throws -> Void, + decoder: JSONDecoder = JSONDecoder() + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Void, + JSONDecoder, + VoidEncoder + > + { + let handler = LambdaCodableAdapter( + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } +} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda.swift b/Sources/AWSLambdaRuntimeCore/NewLambda.swift index 28eb7df9..d72df20b 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambda.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambda.swift @@ -14,6 +14,7 @@ import Dispatch import Logging +import NIOCore extension Lambda { package static func runLoop( @@ -44,4 +45,7 @@ extension Lambda { } } } + + /// The default EventLoop the Lambda is scheduled on. + package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next() } diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift index 0f9e7412..2464a486 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaHandlers.swift @@ -171,3 +171,70 @@ package struct ClosureHandler: NewLambdaHandler { try await self.body(event, context) } } + +extension NewLambdaRuntime { + /// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure. + /// - Parameter body: The handler in the form of a closure. + package convenience init( + body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void + ) where Handler == StreamingClosureHandler { + self.init(handler: StreamingClosureHandler(body: body)) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. + package convenience init< + Event: Decodable, + Output: Encodable, + Encoder: LambdaOutputEncoder, + Decoder: LambdaEventDecoder + >( + encoder: Encoder, + decoder: Decoder, + body: @escaping (Event, NewLambdaContext) async throws -> Output + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Output, + Decoder, + Encoder + > + { + let handler = LambdaCodableAdapter( + encoder: encoder, + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } + + /// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder. + /// - Parameter body: The handler in the form of a closure. + /// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``. + /// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type. + package convenience init( + decoder: Decoder, + body: @escaping (Event, NewLambdaContext) async throws -> Void + ) + where + Handler == LambdaCodableAdapter< + LambdaHandlerAdapter>, + Event, + Void, + Decoder, + VoidEncoder + > + { + let handler = LambdaCodableAdapter( + decoder: decoder, + handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body)) + ) + + self.init(handler: handler) + } +} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift new file mode 100644 index 00000000..41212b22 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift @@ -0,0 +1,70 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Logging +import NIOCore +import NIOConcurrencyHelpers + +// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today. +// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this +// sadly crashes the compiler today. +package final class NewLambdaRuntime: @unchecked Sendable where Handler: StreamingLambdaHandler { + // TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore + let handlerMutex: NIOLockedValueBox> + let logger: Logger + let eventLoop: EventLoop + + package init( + handler: sending Handler, + eventLoop: EventLoop = Lambda.defaultEventLoop, + logger: Logger = Logger(label: "LambdaRuntime") + ) { + self.handlerMutex = NIOLockedValueBox(handler) + self.eventLoop = eventLoop + self.logger = logger + } + + package func run() async throws { + guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else { + throw NewLambdaRuntimeError(code: .missingLambdaRuntimeAPIEnvironmentVariable) + } + + let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1) + let ip = String(ipAndPort[0]) + guard let port = Int(ipAndPort[1]) else { throw NewLambdaRuntimeError(code: .invalidPort) } + + let handler = self.handlerMutex.withLockedValue { handler in + let result = handler + handler = nil + return result + } + + guard let handler else { + throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce) + } + + try await NewLambdaRuntimeClient.withRuntimeClient( + configuration: .init(ip: ip, port: port), + eventLoop: self.eventLoop, + logger: self.logger + ) { runtimeClient in + try await Lambda.runLoop( + runtimeClient: runtimeClient, + handler: handler, + logger: self.logger + ) + } + } +} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift index 98db7bf9..46199e98 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift @@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol { NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) try channel.pipeline.syncOperations.addHandler( - LambdaChannelHandler(delegate: self, logger: self.logger) + LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration) ) return channel.eventLoop.makeSucceededFuture(()) } catch { @@ -433,10 +433,33 @@ private final class LambdaChannelHandler private var reusableErrorBuffer: ByteBuffer? private let logger: Logger private let delegate: Delegate + private let configuration: NewLambdaRuntimeClient.Configuration - init(delegate: Delegate, logger: Logger) { + /// These are the default headers that must be sent along an invocation + let defaultHeaders: HTTPHeaders + /// These headers must be sent along an invocation or initialization error report + let errorHeaders: HTTPHeaders + /// These headers must be sent when streaming a response + let streamingHeaders: HTTPHeaders + + init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) { self.delegate = delegate self.logger = logger + self.configuration = configuration + self.defaultHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": "Swift-Lambda/Unknown", + ] + self.errorHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": "Swift-Lambda/Unknown", + "lambda-runtime-function-error-type": "Unhandled", + ] + self.streamingHeaders = [ + "host": "\(self.configuration.ip):\(self.configuration.port)", + "user-agent": "Swift-Lambda/Unknown", + "transfer-encoding": "chunked", + ] } func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation { @@ -578,7 +601,7 @@ private final class LambdaChannelHandler version: .http1_1, method: .POST, uri: url, - headers: NewLambdaRuntimeClient.streamingHeaders + headers: self.streamingHeaders ) context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) @@ -604,11 +627,12 @@ private final class LambdaChannelHandler let headers: HTTPHeaders = if byteBuffer?.readableBytes ?? 0 < 6_000_000 { [ + "host": "\(self.configuration.ip):\(self.configuration.port)", "user-agent": "Swift-Lambda/Unknown", "content-length": "\(byteBuffer?.readableBytes ?? 0)", ] } else { - NewLambdaRuntimeClient.streamingHeaders + self.streamingHeaders } let httpRequest = HTTPRequestHead( @@ -634,7 +658,7 @@ private final class LambdaChannelHandler version: .http1_1, method: .GET, uri: self.nextInvocationPath, - headers: NewLambdaRuntimeClient.defaultHeaders + headers: self.defaultHeaders ) context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil) @@ -650,7 +674,7 @@ private final class LambdaChannelHandler version: .http1_1, method: .POST, uri: url, - headers: NewLambdaRuntimeClient.errorHeaders + headers: self.errorHeaders ) if self.reusableErrorBuffer == nil { @@ -797,22 +821,3 @@ extension LambdaChannelHandler: ChannelInboundHandler { context.fireChannelInactive() } } - -extension NewLambdaRuntimeClient { - static let defaultHeaders: HTTPHeaders = [ - "user-agent": "Swift-Lambda/Unknown" - ] - - /// These headers must be sent along an invocation or initialization error report - static let errorHeaders: HTTPHeaders = [ - "user-agent": "Swift-Lambda/Unknown", - "lambda-runtime-function-error-type": "Unhandled", - ] - - /// These headers must be sent along an invocation or initialization error report - static let streamingHeaders: HTTPHeaders = [ - "user-agent": "Swift-Lambda/Unknown", - "transfer-encoding": "streaming", - ] - -} diff --git a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift index 2468ef99..02a4d8b8 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift @@ -29,6 +29,9 @@ package struct NewLambdaRuntimeError: Error { case nextInvocationMissingHeaderDeadline case nextInvocationMissingHeaderInvokeFuctionARN + case missingLambdaRuntimeAPIEnvironmentVariable + case runtimeCanOnlyBeStartedOnce + case invalidPort } package init(code: Code, underlying: (any Error)? = nil) {