diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda.swift b/Sources/AWSLambdaRuntimeCore/NewLambda.swift new file mode 100644 index 00000000..5adb8f57 --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/NewLambda.swift @@ -0,0 +1,56 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import Logging +import NIOCore + +package protocol StreamingLambdaHandler { + mutating func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: NewLambdaContext + ) async throws +} + +extension Lambda { + package static func runLoop( + runtimeClient: RuntimeClient, + handler: Handler, + logger: Logger + ) async throws where Handler: StreamingLambdaHandler { + var handler = handler + + while !Task.isCancelled { + let (invocation, writer) = try await runtimeClient.nextInvocation() + + do { + try await handler.handle( + invocation.event, + responseWriter: writer, + context: NewLambdaContext( + requestID: invocation.metadata.requestID, + traceID: invocation.metadata.traceID, + invokedFunctionARN: invocation.metadata.invokedFunctionARN, + deadline: DispatchWallTime(millisSinceEpoch: invocation.metadata.deadlineInMillisSinceEpoch), + logger: logger + ) + ) + } catch { + try await writer.reportError(error) + continue + } + } + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift new file mode 100644 index 00000000..4023dc76 --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -0,0 +1,295 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AWSLambdaRuntimeCore +import Foundation +import Logging +import NIOCore + +struct LambdaMockWriter: LambdaResponseStreamWriter { + var underlying: LambdaMockClient + + init(underlying: LambdaMockClient) { + self.underlying = underlying + } + + mutating func write(_ buffer: ByteBuffer) async throws { + try await self.underlying.write(buffer) + } + + func finish() async throws { + try await self.underlying.finish() + } + + func writeAndFinish(_ buffer: ByteBuffer) async throws { + try await self.underlying.write(buffer) + try await self.underlying.finish() + } + + func reportError(_ error: any Error) async throws { + await self.underlying.reportError(error) + } +} + +enum LambdaError: Error, Equatable { + case cannotCallNextEndpointWhenAlreadyWaitingForEvent + case cannotCallNextEndpointWhenAlreadyProcessingAnEvent + case cannotReportResultWhenNoEventHasBeenProcessed + case cancelError + case handlerError +} + +final actor LambdaMockClient: LambdaRuntimeClientProtocol { + typealias Writer = LambdaMockWriter + + private struct StateMachine { + private enum State { + // The Lambda has just started, or an event has finished processing and the runtime is ready to receive more events. + // Expecting a next() call by the runtime. + case initialState + + // The next endpoint has been called but no event has arrived yet. + case waitingForNextEvent(eventArrivedHandler: CheckedContinuation) + + // The handler is processing the event. Buffers written to the writer are accumulated. + case handlerIsProcessing( + accumulatedResponse: [ByteBuffer], + eventProcessedHandler: CheckedContinuation + ) + } + + private var state: State = .initialState + + // Queue incoming events if the runtime is busy handling an event. + private var eventQueue = [Event]() + + enum InvokeAction { + // The next endpoint is waiting for an event. Deliver this newly arrived event to it. + case readyToProcess(_ eventArrivedHandler: CheckedContinuation) + + // The next endpoint has not been called yet. This event has been added to the queue. + case wait + } + + enum NextAction { + // There is an event available to be processed. + case readyToProcess(Invocation) + + // No events available yet. Wait for an event to arrive. + case wait + + case fail(LambdaError) + } + + enum CancelNextAction { + case none + + case cancelContinuation(CheckedContinuation) + } + + enum ResultAction { + case readyForMore + + case fail(LambdaError) + } + + enum FailProcessingAction { + case none + + case throwContinuation(CheckedContinuation) + } + + mutating func next(_ eventArrivedHandler: CheckedContinuation) -> NextAction { + switch self.state { + case .initialState: + if self.eventQueue.isEmpty { + // No event available yet -- store the continuation for the next invoke() call. + self.state = .waitingForNextEvent(eventArrivedHandler: eventArrivedHandler) + return .wait + } else { + // An event is already waiting to be processed + let event = self.eventQueue.removeFirst() // TODO: use Deque + + self.state = .handlerIsProcessing( + accumulatedResponse: [], + eventProcessedHandler: event.eventProcessedHandler + ) + return .readyToProcess(event.invocation) + } + case .waitingForNextEvent: + return .fail(.cannotCallNextEndpointWhenAlreadyWaitingForEvent) + case .handlerIsProcessing: + return .fail(.cannotCallNextEndpointWhenAlreadyProcessingAnEvent) + } + } + + mutating func invoke(_ event: Event) -> InvokeAction { + switch self.state { + case .initialState, .handlerIsProcessing: + // next() hasn't been called yet. Add to the event queue. + self.eventQueue.append(event) + return .wait + case .waitingForNextEvent(let eventArrivedHandler): + // The runtime is already waiting for an event + self.state = .handlerIsProcessing( + accumulatedResponse: [], + eventProcessedHandler: event.eventProcessedHandler + ) + return .readyToProcess(eventArrivedHandler) + } + } + + mutating func writeResult(buffer: ByteBuffer) -> ResultAction { + switch self.state { + case .handlerIsProcessing(var accumulatedResponse, let eventProcessedHandler): + accumulatedResponse.append(buffer) + self.state = .handlerIsProcessing( + accumulatedResponse: accumulatedResponse, + eventProcessedHandler: eventProcessedHandler + ) + return .readyForMore + case .initialState, .waitingForNextEvent: + return .fail(.cannotReportResultWhenNoEventHasBeenProcessed) + } + } + + mutating func finish() throws { + switch self.state { + case .handlerIsProcessing(let accumulatedResponse, let eventProcessedHandler): + let finalResult: ByteBuffer = accumulatedResponse.reduce(ByteBuffer()) { (accumulated, current) in + var accumulated = accumulated + accumulated.writeBytes(current.readableBytesView) + return accumulated + } + + eventProcessedHandler.resume(returning: finalResult) + // reset back to the initial state + self.state = .initialState + case .initialState, .waitingForNextEvent: + throw LambdaError.cannotReportResultWhenNoEventHasBeenProcessed + } + } + + mutating func cancelNext() -> CancelNextAction { + switch self.state { + case .initialState, .handlerIsProcessing: + return .none + case .waitingForNextEvent(let eventArrivedHandler): + self.state = .initialState + return .cancelContinuation(eventArrivedHandler) + } + } + + mutating func failProcessing() -> FailProcessingAction { + switch self.state { + case .initialState, .waitingForNextEvent: + // Cannot report an error for an event if the event is not currently being processed. + fatalError() + case .handlerIsProcessing(_, let eventProcessedHandler): + return .throwContinuation(eventProcessedHandler) + } + } + } + + private var stateMachine = StateMachine() + + struct Event { + let invocation: Invocation + let eventProcessedHandler: CheckedContinuation + } + + func invoke(event: ByteBuffer) async throws -> ByteBuffer { + try await withCheckedThrowingContinuation { eventProcessedHandler in + do { + let metadata = try InvocationMetadata( + headers: .init([ + ("Lambda-Runtime-Aws-Request-Id", "100"), // arbitrary values + ("Lambda-Runtime-Deadline-Ms", "100"), + ("Lambda-Runtime-Invoked-Function-Arn", "100"), + ]) + ) + let invocation = Invocation(metadata: metadata, event: event) + + let invokeAction = self.stateMachine.invoke( + Event( + invocation: invocation, + eventProcessedHandler: eventProcessedHandler + ) + ) + + switch invokeAction { + case .readyToProcess(let eventArrivedHandler): + // nextInvocation had been called earlier and is currently waiting for an event; deliver + eventArrivedHandler.resume(returning: invocation) + case .wait: + // The event has been added to the event queue; wait for it to be picked up + break + } + } catch { + eventProcessedHandler.resume(throwing: error) + } + } + } + + func nextInvocation() async throws -> (Invocation, Writer) { + try await withTaskCancellationHandler { + let invocation = try await withCheckedThrowingContinuation { eventArrivedHandler in + switch self.stateMachine.next(eventArrivedHandler) { + case .readyToProcess(let event): + eventArrivedHandler.resume(returning: event) + case .fail(let error): + eventArrivedHandler.resume(throwing: error) + case .wait: + break + } + } + return (invocation, Writer(underlying: self)) + } onCancel: { + Task { + await self.cancelNextInvocation() + } + } + } + + private func cancelNextInvocation() { + switch self.stateMachine.cancelNext() { + case .none: + break + case .cancelContinuation(let continuation): + continuation.resume(throwing: LambdaError.cancelError) + } + } + + func write(_ buffer: ByteBuffer) async throws { + switch self.stateMachine.writeResult(buffer: buffer) { + case .readyForMore: + break + case .fail(let error): + throw error + } + } + + func finish() async throws { + try self.stateMachine.finish() + } + + func reportError(_ error: any Error) { + switch self.stateMachine.failProcessing() { + case .none: + break + case .throwContinuation(let continuation): + continuation.resume(throwing: error) + } + } +} diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift new file mode 100644 index 00000000..d85d7f92 --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift @@ -0,0 +1,89 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Logging +import NIOCore +import Testing + +@testable import AWSLambdaRuntimeCore + +@Suite +struct LambdaRunLoopTests { + struct MockEchoHandler: StreamingLambdaHandler { + func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: NewLambdaContext + ) async throws { + try await responseWriter.writeAndFinish(event) + } + } + + struct FailingHandler: StreamingLambdaHandler { + func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: NewLambdaContext + ) async throws { + throw LambdaError.handlerError + } + } + + let mockClient = LambdaMockClient() + let mockEchoHandler = MockEchoHandler() + let failingHandler = FailingHandler() + + @Test func testRunLoop() async throws { + let inputEvent = ByteBuffer(string: "Test Invocation Event") + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await Lambda.runLoop( + runtimeClient: self.mockClient, + handler: self.mockEchoHandler, + logger: Logger(label: "RunLoopTest") + ) + } + + let response = try await self.mockClient.invoke(event: inputEvent) + #expect(response == inputEvent) + + group.cancelAll() + } + } + + @Test func testRunLoopError() async throws { + let inputEvent = ByteBuffer(string: "Test Invocation Event") + + await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await Lambda.runLoop( + runtimeClient: self.mockClient, + handler: self.failingHandler, + logger: Logger(label: "RunLoopTest") + ) + } + + await #expect( + throws: LambdaError.handlerError, + performing: { + try await self.mockClient.invoke(event: inputEvent) + } + ) + + group.cancelAll() + } + } +}