Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new LambdaRuntime #353

Merged
merged 8 commits into from
Sep 5, 2024
114 changes: 114 additions & 0 deletions Sources/AWSLambdaRuntime/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,117 @@ extension LambdaCodableAdapter {
)
}
}

extension NewLambdaRuntime {
/// Initialize an instance with a ``StreamingLambdaHandler`` in the form of a closure.
/// - Parameter body: The handler in the form of a closure.
package convenience init(
body: @Sendable @escaping (ByteBuffer, LambdaResponseStreamWriter, NewLambdaContext) async throws -> Void
) where Handler == StreamingClosureHandler {
self.init(handler: StreamingClosureHandler(body: body))
}
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<
Event: Decodable,
Output: Encodable,
Encoder: LambdaOutputEncoder,
Decoder: LambdaEventDecoder
>(
encoder: Encoder,
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Output
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
Decoder,
Encoder
>
{
let handler = LambdaCodableAdapter(
encoder: encoder,
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**, an encoder, and a decoder.
/// - Parameter body: The handler in the form of a closure.
/// - Parameter encoder: The encoder object that will be used to encode the generic ``Output`` into a ``ByteBuffer``.
/// - Parameter decoder: The decoder object that will be used to decode the incoming ``ByteBuffer`` event into the generic ``Event`` type.
package convenience init<Event: Decodable, Decoder: LambdaEventDecoder>(
decoder: Decoder,
body: @escaping (Event, NewLambdaContext) async throws -> Void
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
Decoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: decoder,
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a non-`Void` return type**.
/// - note: ``JSONEncoder`` and ``JSONDecoder`` are used as the encoder and decoder objects. Use ``init(encoder:decoder:body:)`` to specify custom encoder and decoder objects.
/// - Parameter body: The handler in the form of a closure.
package convenience init<Event: Decodable, Output>(
body: @escaping (Event, NewLambdaContext) async throws -> Output
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Output, ClosureHandler<Event, Output>>,
Event,
Output,
JSONDecoder,
LambdaJSONOutputEncoder<Output>
>
{
let handler = LambdaCodableAdapter(
encoder: JSONEncoder(),
decoder: JSONDecoder(),
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}

/// Initialize an instance with a ``NewLambdaHandler`` defined in the form of a closure **with a `Void` return type**.
/// - note: ``JSONDecoder`` is used as the decoder object. Use ``init(decoder:body:)`` to specify a custom decoder object.
/// - Parameter body: The handler in the form of a closure.
package convenience init<Event: Decodable>(
body: @escaping (Event, NewLambdaContext) async throws -> Void
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
)
where
Handler == LambdaCodableAdapter<
LambdaHandlerAdapter<Event, Void, ClosureHandler<Event, Void>>,
Event,
Void,
JSONDecoder,
VoidEncoder
>
{
let handler = LambdaCodableAdapter(
decoder: JSONDecoder(),
handler: LambdaHandlerAdapter(handler: ClosureHandler(body: body))
)

self.init(handler: handler)
}
}
4 changes: 4 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import Dispatch
import Logging
import NIOCore

extension Lambda {
package static func runLoop<RuntimeClient: LambdaRuntimeClientProtocol, Handler>(
Expand Down Expand Up @@ -44,4 +45,7 @@ extension Lambda {
}
}
}

/// The default EventLoop the Lambda is scheduled on.
package static var defaultEventLoop: any EventLoop = NIOSingletons.posixEventLoopGroup.next()
}
68 changes: 68 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntime.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Foundation
//import ServiceLifecycle
import Logging
import NIOCore
import Synchronization

package final class NewLambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
let handlerMutex: Mutex<Handler?>
let logger: Logger
let eventLoop: EventLoop

package init(
handler: sending Handler,
eventLoop: EventLoop = Lambda.defaultEventLoop,
logger: Logger = Logger(label: "LambdaRuntime")
) {
self.handlerMutex = Mutex(handler)
self.eventLoop = eventLoop
self.logger = logger
}

package func run() async throws {
guard let runtimeEndpoint = Lambda.env("AWS_LAMBDA_RUNTIME_API") else {
throw NewLambdaRuntimeError(code: .cannotStartLambdaRuntime)
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
}

let ipAndPort = runtimeEndpoint.split(separator: ":", maxSplits: 1)
let ip = String(ipAndPort[0])
let port = Int(ipAndPort[1])!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should throw an error here, if the second part is not an Int. .invalidPort.


let handler = self.handlerMutex.withLock { maybeHandler in
defer {
maybeHandler = nil
}
return maybeHandler
}

guard let handler else {
throw NewLambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
}

try await NewLambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
}
}
54 changes: 29 additions & 25 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ final actor NewLambdaRuntimeClient: LambdaRuntimeClientProtocol {
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
)
try channel.pipeline.syncOperations.addHandler(
LambdaChannelHandler(delegate: self, logger: self.logger)
LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration)
)
return channel.eventLoop.makeSucceededFuture(())
} catch {
Expand Down Expand Up @@ -433,10 +433,32 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
private var reusableErrorBuffer: ByteBuffer?
private let logger: Logger
private let delegate: Delegate
private let configuration: NewLambdaRuntimeClient.Configuration

init(delegate: Delegate, logger: Logger) {
let defaultHeaders: HTTPHeaders
/// These headers must be sent along an invocation or initialization error report
let errorHeaders: HTTPHeaders
/// These headers must be sent along an invocation or initialization error report
Copy link
Member

@fabianfett fabianfett Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code comment is not true.

let streamingHeaders: HTTPHeaders

init(delegate: Delegate, logger: Logger, configuration: NewLambdaRuntimeClient.Configuration) {
self.delegate = delegate
self.logger = logger
self.configuration = configuration
self.defaultHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
]
self.errorHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]
self.streamingHeaders = [
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
aryan-25 marked this conversation as resolved.
Show resolved Hide resolved
]
}

func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
Expand Down Expand Up @@ -578,7 +600,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.streamingHeaders
headers: self.streamingHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -604,11 +626,12 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
let headers: HTTPHeaders =
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
[
"host": "\(self.configuration.ip):\(self.configuration.port)",
"user-agent": "Swift-Lambda/Unknown",
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
]
} else {
NewLambdaRuntimeClient.streamingHeaders
self.streamingHeaders
}

let httpRequest = HTTPRequestHead(
Expand All @@ -634,7 +657,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .GET,
uri: self.nextInvocationPath,
headers: NewLambdaRuntimeClient.defaultHeaders
headers: self.defaultHeaders
)

context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
Expand All @@ -650,7 +673,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
version: .http1_1,
method: .POST,
uri: url,
headers: NewLambdaRuntimeClient.errorHeaders
headers: self.errorHeaders
)

if self.reusableErrorBuffer == nil {
Expand Down Expand Up @@ -797,22 +820,3 @@ extension LambdaChannelHandler: ChannelInboundHandler {
context.fireChannelInactive()
}
}

extension NewLambdaRuntimeClient {
static let defaultHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown"
]

/// These headers must be sent along an invocation or initialization error report
static let errorHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"lambda-runtime-function-error-type": "Unhandled",
]

/// These headers must be sent along an invocation or initialization error report
static let streamingHeaders: HTTPHeaders = [
"user-agent": "Swift-Lambda/Unknown",
"transfer-encoding": "streaming",
]

}
2 changes: 2 additions & 0 deletions Sources/AWSLambdaRuntimeCore/NewLambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ package struct NewLambdaRuntimeError: Error {
case nextInvocationMissingHeaderDeadline
case nextInvocationMissingHeaderInvokeFuctionARN

case cannotStartLambdaRuntime
case runtimeCanOnlyBeStartedOnce
}

package init(code: Code, underlying: (any Error)? = nil) {
Expand Down
Loading