From 71f18eb8ff9828524a69556a98942df028590d78 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 29 Aug 2024 12:28:13 +0100 Subject: [PATCH 01/12] Add new Lambda + runLoop function --- Sources/AWSLambdaRuntimeCore/NewLambda.swift | 56 ++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 Sources/AWSLambdaRuntimeCore/NewLambda.swift diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda.swift b/Sources/AWSLambdaRuntimeCore/NewLambda.swift new file mode 100644 index 00000000..f945c1dd --- /dev/null +++ b/Sources/AWSLambdaRuntimeCore/NewLambda.swift @@ -0,0 +1,56 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch +import Logging +import NIOCore + +package protocol StreamingLambdaHandler { + mutating func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: NewLambdaContext + ) async throws +} + +package enum NewLambda { + package static func runLoop( + runtimeClient: RuntimeClient, + handler: Handler, + logger: Logger + ) async throws + where Handler: StreamingLambdaHandler { + while !Task.isCancelled { + let (invocation, writer) = try await runtimeClient.nextInvocation() + + var handler = handler + do { + try await handler.handle( + invocation.event, + responseWriter: writer, + context: NewLambdaContext( + requestID: invocation.metadata.requestID, + traceID: invocation.metadata.traceID, + invokedFunctionARN: invocation.metadata.invokedFunctionARN, + deadline: DispatchWallTime(millisSinceEpoch: invocation.metadata.deadlineInMillisSinceEpoch), + logger: logger + ) + ) + } catch { + try await writer.reportError(error) + continue + } + } + } +} From d496cbbb299bef1c5720711b8739da01c7451c7f Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Thu, 29 Aug 2024 14:27:33 +0100 Subject: [PATCH 02/12] Extend on existing Lambda enum instead + refactor --- Sources/AWSLambdaRuntimeCore/NewLambda.swift | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Sources/AWSLambdaRuntimeCore/NewLambda.swift b/Sources/AWSLambdaRuntimeCore/NewLambda.swift index f945c1dd..5adb8f57 100644 --- a/Sources/AWSLambdaRuntimeCore/NewLambda.swift +++ b/Sources/AWSLambdaRuntimeCore/NewLambda.swift @@ -24,17 +24,17 @@ package protocol StreamingLambdaHandler { ) async throws } -package enum NewLambda { +extension Lambda { package static func runLoop( runtimeClient: RuntimeClient, handler: Handler, logger: Logger - ) async throws - where Handler: StreamingLambdaHandler { + ) async throws where Handler: StreamingLambdaHandler { + var handler = handler + while !Task.isCancelled { let (invocation, writer) = try await runtimeClient.nextInvocation() - var handler = handler do { try await handler.handle( invocation.event, From 039d4f3953c5d9d5720f6965573d2b29a7871a85 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Fri, 30 Aug 2024 15:21:57 +0100 Subject: [PATCH 03/12] Add mock client + test for runLoop --- .../AWSLambdaTesting/LambdaMockClient.swift | 266 ++++++++++++++++++ .../AWSLambdaTesting/LambdaRunLoopTests.swift | 36 +++ 2 files changed, 302 insertions(+) create mode 100644 Sources/AWSLambdaTesting/LambdaMockClient.swift create mode 100644 Sources/AWSLambdaTesting/LambdaRunLoopTests.swift diff --git a/Sources/AWSLambdaTesting/LambdaMockClient.swift b/Sources/AWSLambdaTesting/LambdaMockClient.swift new file mode 100644 index 00000000..137928e2 --- /dev/null +++ b/Sources/AWSLambdaTesting/LambdaMockClient.swift @@ -0,0 +1,266 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import AWSLambdaRuntimeCore +import Foundation +import Logging +import NIOCore + +package struct LambdaMockWriter: LambdaResponseStreamWriter { + var underlying: LambdaMockClient + + package init(underlying: LambdaMockClient) { + self.underlying = underlying + } + + package mutating func write(_ buffer: ByteBuffer) async throws { + try await self.underlying.write(buffer) + } + + package consuming func finish() async throws { + try await self.underlying.finish() + } + + package consuming func writeAndFinish(_ buffer: ByteBuffer) async throws { + try await self.write(buffer) + try await self.finish() + } + + package func reportError(_ error: any Error) async throws { + } +} + +package struct LambdaError: Error, Equatable { + private enum Code: Equatable { + case cannotCallNextEndpointWhenAlreadyWaitingForEvent + case cannotCallNextEndpointWhenAlreadyProcessingAnEvent + case cannotReportResultWhenNoEventHasBeenProcessed + } + + private let code: Code + + private init(code: Code) { + self.code = code + } + + package func shortDescription() -> String { + switch self.code { + case .cannotCallNextEndpointWhenAlreadyWaitingForEvent: + "Cannot call the next endpoint when already waiting for an event" + case .cannotCallNextEndpointWhenAlreadyProcessingAnEvent: + "Cannot call the next endpoint when an event is already being processed" + case .cannotReportResultWhenNoEventHasBeenProcessed: + "Cannot report a result when no event has been processed" + } + } + + package static let cannotCallNextEndpointWhenAlreadyWaitingForEvent = LambdaError( + code: .cannotCallNextEndpointWhenAlreadyWaitingForEvent + ) + package static let cannotCallNextEndpointWhenAlreadyProcessingAnEvent = LambdaError( + code: .cannotCallNextEndpointWhenAlreadyProcessingAnEvent + ) + package static let cannotReportResultWhenNoEventHasBeenProcessed = LambdaError( + code: .cannotReportResultWhenNoEventHasBeenProcessed + ) +} + +package final actor LambdaMockClient: LambdaRuntimeClientProtocol { + package typealias Writer = LambdaMockWriter + + private struct StateMachine { + private enum State { + // The Lambda has just started, or an event has finished processing and the runtime is ready to receive more events. + // Expecting a next() call by the runtime. + case initialState + + // The next endpoint has been called but no event has arrived yet. + case waitingForNextEvent(eventArrivedHandler: CheckedContinuation) + + // The handler is processing the event. Buffers written to the writer are accumulated. + case handlerIsProcessing( + accumulatedResponse: [ByteBuffer], + eventProcessedHandler: CheckedContinuation + ) + } + + private var state: State = .initialState + + // Queue incoming events if the runtime is busy handling an event. + private var eventQueue = [Event]() + + enum InvokeAction { + // The next endpoint is waiting for an event. Deliver this newly arrived event to it. + case readyToProcess(_ eventArrivedHandler: CheckedContinuation) + + // The next endpoint has not been called yet. This event has been added to the queue. + case wait + } + + enum NextAction { + // There is an event available to be processed. + case readyToProcess(Invocation) + + // No events available yet. Wait for an event to arrive. + case wait + + case fail(LambdaError) + } + + enum ResultAction { + case readyForMore + + case fail(LambdaError) + } + + mutating func next(_ eventArrivedHandler: CheckedContinuation) -> NextAction { + switch self.state { + case .initialState: + if self.eventQueue.isEmpty { + // No event available yet -- store the continuation for the next invoke() call. + self.state = .waitingForNextEvent(eventArrivedHandler: eventArrivedHandler) + return .wait + } else { + // An event is already waiting to be processed + let event = self.eventQueue.removeFirst() // TODO: use Deque + + self.state = .handlerIsProcessing( + accumulatedResponse: [], + eventProcessedHandler: event.eventProcessedHandler + ) + return .readyToProcess(event.invocation) + } + case .waitingForNextEvent: + return .fail(.cannotCallNextEndpointWhenAlreadyWaitingForEvent) + case .handlerIsProcessing: + return .fail(.cannotCallNextEndpointWhenAlreadyProcessingAnEvent) + } + } + + mutating func invoke(_ event: Event) -> InvokeAction { + switch self.state { + case .initialState, .handlerIsProcessing: + // next() hasn't been called yet. Add to the event queue. + self.eventQueue.append(event) + return .wait + case .waitingForNextEvent(let eventArrivedHandler): + // The runtime is already waiting for an event + self.state = .handlerIsProcessing( + accumulatedResponse: [], + eventProcessedHandler: event.eventProcessedHandler + ) + return .readyToProcess(eventArrivedHandler) + } + } + + mutating func writeResult(buffer: ByteBuffer) -> ResultAction { + switch self.state { + case .handlerIsProcessing(var accumulatedResponse, let eventProcessedHandler): + accumulatedResponse.append(buffer) + self.state = .handlerIsProcessing( + accumulatedResponse: accumulatedResponse, + eventProcessedHandler: eventProcessedHandler + ) + return .readyForMore + case .initialState, .waitingForNextEvent: + return .fail(.cannotReportResultWhenNoEventHasBeenProcessed) + } + } + + mutating func finish() throws { + switch self.state { + case .handlerIsProcessing(let accumulatedResponse, let eventProcessedHandler): + let finalResult: ByteBuffer = accumulatedResponse.reduce(ByteBuffer()) { (accumulated, current) in + var accumulated = accumulated + accumulated.writeBytes(current.readableBytesView) + return accumulated + } + + eventProcessedHandler.resume(returning: finalResult) + // reset back to the initial state + self.state = .initialState + case .initialState, .waitingForNextEvent: + throw LambdaError.cannotReportResultWhenNoEventHasBeenProcessed + } + } + } + + private var stateMachine: StateMachine = .init() + + struct Event { + let invocation: Invocation + let eventProcessedHandler: CheckedContinuation + } + + package func invoke(event: ByteBuffer) async throws -> ByteBuffer { + try await withCheckedThrowingContinuation { eventProcessedHandler in + do { + let metadata = try InvocationMetadata( + headers: .init([ + ("Lambda-Runtime-Aws-Request-Id", "100"), // arbitrary values + ("Lambda-Runtime-Deadline-Ms", "100"), + ("Lambda-Runtime-Invoked-Function-Arn", "100"), + ]) + ) + let invocation = Invocation(metadata: metadata, event: event) + + let invokeAction = self.stateMachine.invoke( + Event( + invocation: invocation, + eventProcessedHandler: eventProcessedHandler + ) + ) + + switch invokeAction { + case .readyToProcess(let eventArrivedHandler): + // nextInvocation had been called earlier and is currently waiting for an event; deliver + eventArrivedHandler.resume(returning: invocation) + case .wait: + // The event has been added to the event queue; wait for it to be picked up + break + } + } catch { + eventProcessedHandler.resume(throwing: error) + } + } + } + + package func nextInvocation() async throws -> (Invocation, Writer) { + let invocation = try await withCheckedThrowingContinuation { eventArrivedHandler in + switch self.stateMachine.next(eventArrivedHandler) { + case .readyToProcess(let event): + eventArrivedHandler.resume(returning: event) + case .fail(let error): + eventArrivedHandler.resume(throwing: error) + case .wait: + break + } + } + + return (invocation, Writer(underlying: self)) + } + + package func write(_ buffer: ByteBuffer) async throws { + switch self.stateMachine.writeResult(buffer: buffer) { + case .readyForMore: + break + case .fail(let error): + throw error + } + } + + package func finish() async throws { + try self.stateMachine.finish() + } +} diff --git a/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift b/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift new file mode 100644 index 00000000..875f1219 --- /dev/null +++ b/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift @@ -0,0 +1,36 @@ +import Foundation +import Logging +import NIOCore +import Testing + +@testable import AWSLambdaRuntimeCore + +struct LambdaRunLoopTests { + struct MockEchoHandler: StreamingLambdaHandler { + func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: NewLambdaContext + ) async throws { + try await responseWriter.writeAndFinish(event) + } + } + + let mockClient = LambdaMockClient() + let mockEchoHandler = MockEchoHandler() + + @Test func testRunLoop() async throws { + _ = Task { () in + try await Lambda.runLoop( + runtimeClient: self.mockClient, + handler: self.mockEchoHandler, + logger: Logger(label: "RunLoopTest") + ) + } + + let inputEvent = ByteBuffer(string: "Test Invocation Event") + let response = try await self.mockClient.invoke(event: inputEvent) + + #expect(response == inputEvent) + } +} From 4cb8a2cd4048ba91debdb04832c6a409b7a959f6 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Fri, 30 Aug 2024 15:36:10 +0100 Subject: [PATCH 04/12] Add license header --- Sources/AWSLambdaTesting/LambdaRunLoopTests.swift | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift b/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift index 875f1219..25ccaba0 100644 --- a/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift +++ b/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift @@ -1,3 +1,17 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + import Foundation import Logging import NIOCore From 438b87dc413461058bb6fe0a82086fd230397c69 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Fri, 30 Aug 2024 15:50:13 +0100 Subject: [PATCH 05/12] Refactor --- .../AWSLambdaTesting/LambdaMockClient.swift | 51 ++++--------------- .../AWSLambdaTesting/LambdaRunLoopTests.swift | 4 +- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/Sources/AWSLambdaTesting/LambdaMockClient.swift b/Sources/AWSLambdaTesting/LambdaMockClient.swift index 137928e2..4744ba11 100644 --- a/Sources/AWSLambdaTesting/LambdaMockClient.swift +++ b/Sources/AWSLambdaTesting/LambdaMockClient.swift @@ -17,7 +17,7 @@ import Foundation import Logging import NIOCore -package struct LambdaMockWriter: LambdaResponseStreamWriter { +struct LambdaMockWriter: LambdaResponseStreamWriter { var underlying: LambdaMockClient package init(underlying: LambdaMockClient) { @@ -41,43 +41,14 @@ package struct LambdaMockWriter: LambdaResponseStreamWriter { } } -package struct LambdaError: Error, Equatable { - private enum Code: Equatable { - case cannotCallNextEndpointWhenAlreadyWaitingForEvent - case cannotCallNextEndpointWhenAlreadyProcessingAnEvent - case cannotReportResultWhenNoEventHasBeenProcessed - } - - private let code: Code - - private init(code: Code) { - self.code = code - } - - package func shortDescription() -> String { - switch self.code { - case .cannotCallNextEndpointWhenAlreadyWaitingForEvent: - "Cannot call the next endpoint when already waiting for an event" - case .cannotCallNextEndpointWhenAlreadyProcessingAnEvent: - "Cannot call the next endpoint when an event is already being processed" - case .cannotReportResultWhenNoEventHasBeenProcessed: - "Cannot report a result when no event has been processed" - } - } - - package static let cannotCallNextEndpointWhenAlreadyWaitingForEvent = LambdaError( - code: .cannotCallNextEndpointWhenAlreadyWaitingForEvent - ) - package static let cannotCallNextEndpointWhenAlreadyProcessingAnEvent = LambdaError( - code: .cannotCallNextEndpointWhenAlreadyProcessingAnEvent - ) - package static let cannotReportResultWhenNoEventHasBeenProcessed = LambdaError( - code: .cannotReportResultWhenNoEventHasBeenProcessed - ) +enum LambdaError: Error, Equatable { + case cannotCallNextEndpointWhenAlreadyWaitingForEvent + case cannotCallNextEndpointWhenAlreadyProcessingAnEvent + case cannotReportResultWhenNoEventHasBeenProcessed } -package final actor LambdaMockClient: LambdaRuntimeClientProtocol { - package typealias Writer = LambdaMockWriter +final actor LambdaMockClient: LambdaRuntimeClientProtocol { + typealias Writer = LambdaMockWriter private struct StateMachine { private enum State { @@ -203,7 +174,7 @@ package final actor LambdaMockClient: LambdaRuntimeClientProtocol { let eventProcessedHandler: CheckedContinuation } - package func invoke(event: ByteBuffer) async throws -> ByteBuffer { + func invoke(event: ByteBuffer) async throws -> ByteBuffer { try await withCheckedThrowingContinuation { eventProcessedHandler in do { let metadata = try InvocationMetadata( @@ -236,7 +207,7 @@ package final actor LambdaMockClient: LambdaRuntimeClientProtocol { } } - package func nextInvocation() async throws -> (Invocation, Writer) { + func nextInvocation() async throws -> (Invocation, Writer) { let invocation = try await withCheckedThrowingContinuation { eventArrivedHandler in switch self.stateMachine.next(eventArrivedHandler) { case .readyToProcess(let event): @@ -251,7 +222,7 @@ package final actor LambdaMockClient: LambdaRuntimeClientProtocol { return (invocation, Writer(underlying: self)) } - package func write(_ buffer: ByteBuffer) async throws { + func write(_ buffer: ByteBuffer) async throws { switch self.stateMachine.writeResult(buffer: buffer) { case .readyForMore: break @@ -260,7 +231,7 @@ package final actor LambdaMockClient: LambdaRuntimeClientProtocol { } } - package func finish() async throws { + func finish() async throws { try self.stateMachine.finish() } } diff --git a/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift b/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift index 25ccaba0..f27bf075 100644 --- a/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift +++ b/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift @@ -34,7 +34,7 @@ struct LambdaRunLoopTests { let mockEchoHandler = MockEchoHandler() @Test func testRunLoop() async throws { - _ = Task { () in + let runLoopTask = Task { () in try await Lambda.runLoop( runtimeClient: self.mockClient, handler: self.mockEchoHandler, @@ -45,6 +45,8 @@ struct LambdaRunLoopTests { let inputEvent = ByteBuffer(string: "Test Invocation Event") let response = try await self.mockClient.invoke(event: inputEvent) + runLoopTask.cancel() + #expect(response == inputEvent) } } From 258fd35d002f3559ad6786ddfe87263bf7a6968b Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Fri, 30 Aug 2024 16:42:46 +0100 Subject: [PATCH 06/12] Fix structure --- .../AWSLambdaRuntimeCoreTests}/LambdaMockClient.swift | 10 +++++----- .../LambdaRunLoopTests.swift | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) rename {Sources/AWSLambdaTesting => Tests/AWSLambdaRuntimeCoreTests}/LambdaMockClient.swift (96%) rename {Sources/AWSLambdaTesting => Tests/AWSLambdaRuntimeCoreTests}/LambdaRunLoopTests.swift (99%) diff --git a/Sources/AWSLambdaTesting/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift similarity index 96% rename from Sources/AWSLambdaTesting/LambdaMockClient.swift rename to Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift index 4744ba11..110fdb35 100644 --- a/Sources/AWSLambdaTesting/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -20,24 +20,24 @@ import NIOCore struct LambdaMockWriter: LambdaResponseStreamWriter { var underlying: LambdaMockClient - package init(underlying: LambdaMockClient) { + init(underlying: LambdaMockClient) { self.underlying = underlying } - package mutating func write(_ buffer: ByteBuffer) async throws { + mutating func write(_ buffer: ByteBuffer) async throws { try await self.underlying.write(buffer) } - package consuming func finish() async throws { + consuming func finish() async throws { try await self.underlying.finish() } - package consuming func writeAndFinish(_ buffer: ByteBuffer) async throws { + consuming func writeAndFinish(_ buffer: ByteBuffer) async throws { try await self.write(buffer) try await self.finish() } - package func reportError(_ error: any Error) async throws { + func reportError(_ error: any Error) async throws { } } diff --git a/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift similarity index 99% rename from Sources/AWSLambdaTesting/LambdaRunLoopTests.swift rename to Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift index f27bf075..ea725756 100644 --- a/Sources/AWSLambdaTesting/LambdaRunLoopTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift @@ -19,6 +19,7 @@ import Testing @testable import AWSLambdaRuntimeCore +@Suite struct LambdaRunLoopTests { struct MockEchoHandler: StreamingLambdaHandler { func handle( From 8f6c5a8de0d5126cbbec9df922bf8f90097b94b4 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Mon, 2 Sep 2024 10:50:33 +0100 Subject: [PATCH 07/12] Refactor + add test for reportError --- .../LambdaMockClient.swift | 60 +++++++++++++++---- .../LambdaRunLoopTests.swift | 50 +++++++++++++--- 2 files changed, 88 insertions(+), 22 deletions(-) diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift index 110fdb35..8d5ce7e8 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -28,16 +28,18 @@ struct LambdaMockWriter: LambdaResponseStreamWriter { try await self.underlying.write(buffer) } - consuming func finish() async throws { + func finish() async throws { try await self.underlying.finish() } - consuming func writeAndFinish(_ buffer: ByteBuffer) async throws { - try await self.write(buffer) - try await self.finish() + func writeAndFinish(_ buffer: ByteBuffer) async throws { + try await self.underlying.write(buffer) + try await self.underlying.finish() } func reportError(_ error: any Error) async throws { + try await self.underlying.write(ByteBuffer(string: "\(error)")) + try await self.underlying.finish() } } @@ -45,6 +47,8 @@ enum LambdaError: Error, Equatable { case cannotCallNextEndpointWhenAlreadyWaitingForEvent case cannotCallNextEndpointWhenAlreadyProcessingAnEvent case cannotReportResultWhenNoEventHasBeenProcessed + case cancelError + case handlerError } final actor LambdaMockClient: LambdaRuntimeClientProtocol { @@ -89,6 +93,12 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { case fail(LambdaError) } + enum CancelNextAction { + case none + + case cancelContinuation(CheckedContinuation) + } + enum ResultAction { case readyForMore @@ -165,6 +175,16 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { throw LambdaError.cannotReportResultWhenNoEventHasBeenProcessed } } + + mutating func cancelNext() -> CancelNextAction { + switch self.state { + case .initialState, .handlerIsProcessing: + return .none + case .waitingForNextEvent(let eventArrivedHandler): + self.state = .initialState + return .cancelContinuation(eventArrivedHandler) + } + } } private var stateMachine: StateMachine = .init() @@ -208,18 +228,32 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { } func nextInvocation() async throws -> (Invocation, Writer) { - let invocation = try await withCheckedThrowingContinuation { eventArrivedHandler in - switch self.stateMachine.next(eventArrivedHandler) { - case .readyToProcess(let event): - eventArrivedHandler.resume(returning: event) - case .fail(let error): - eventArrivedHandler.resume(throwing: error) - case .wait: - break + try await withTaskCancellationHandler { + let invocation = try await withCheckedThrowingContinuation { eventArrivedHandler in + switch self.stateMachine.next(eventArrivedHandler) { + case .readyToProcess(let event): + eventArrivedHandler.resume(returning: event) + case .fail(let error): + eventArrivedHandler.resume(throwing: error) + case .wait: + break + } + } + return (invocation, Writer(underlying: self)) + } onCancel: { + Task { + await self.cancelNextInvocation() } } + } - return (invocation, Writer(underlying: self)) + private func cancelNextInvocation() { + switch self.stateMachine.cancelNext() { + case .none: + break + case .cancelContinuation(let continuation): + continuation.resume(throwing: LambdaError.cancelError) + } } func write(_ buffer: ByteBuffer) async throws { diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift index ea725756..e4235af1 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift @@ -31,23 +31,55 @@ struct LambdaRunLoopTests { } } + struct FailingHandler: StreamingLambdaHandler { + func handle( + _ event: ByteBuffer, + responseWriter: some LambdaResponseStreamWriter, + context: NewLambdaContext + ) async throws { + throw LambdaError.handlerError + } + } + let mockClient = LambdaMockClient() let mockEchoHandler = MockEchoHandler() + let failingHandler = FailingHandler() @Test func testRunLoop() async throws { - let runLoopTask = Task { () in - try await Lambda.runLoop( - runtimeClient: self.mockClient, - handler: self.mockEchoHandler, - logger: Logger(label: "RunLoopTest") - ) + let inputEvent = ByteBuffer(string: "Test Invocation Event") + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await Lambda.runLoop( + runtimeClient: self.mockClient, + handler: self.mockEchoHandler, + logger: Logger(label: "RunLoopTest") + ) + } + + let response = try await self.mockClient.invoke(event: inputEvent) + #expect(response == inputEvent) + + group.cancelAll() } + } + @Test func testRunLoopError() async throws { let inputEvent = ByteBuffer(string: "Test Invocation Event") - let response = try await self.mockClient.invoke(event: inputEvent) - runLoopTask.cancel() + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + try await Lambda.runLoop( + runtimeClient: self.mockClient, + handler: self.failingHandler, + logger: Logger(label: "RunLoopTest") + ) + } + + let response = try await self.mockClient.invoke(event: inputEvent) + #expect(String(buffer: response) == "\(LambdaError.handlerError)") - #expect(response == inputEvent) + group.cancelAll() + } } } From a1d1b43982e17f5bb807aa649c04ca1be022c58d Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Mon, 2 Sep 2024 11:10:33 +0100 Subject: [PATCH 08/12] Fix reportError --- .../LambdaMockClient.swift | 27 +++++++++++++++++-- .../LambdaRunLoopTests.swift | 10 ++++--- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift index 8d5ce7e8..ca82877c 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -38,8 +38,7 @@ struct LambdaMockWriter: LambdaResponseStreamWriter { } func reportError(_ error: any Error) async throws { - try await self.underlying.write(ByteBuffer(string: "\(error)")) - try await self.underlying.finish() + await self.underlying.reportError(error) } } @@ -105,6 +104,12 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { case fail(LambdaError) } + enum CancelProcessingAction { + case none + + case cancelContinuation(CheckedContinuation) + } + mutating func next(_ eventArrivedHandler: CheckedContinuation) -> NextAction { switch self.state { case .initialState: @@ -185,6 +190,15 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { return .cancelContinuation(eventArrivedHandler) } } + + mutating func cancelProcessing() -> CancelProcessingAction { + switch self.state { + case .initialState, .waitingForNextEvent: + return .none + case .handlerIsProcessing(_, let eventProcessedHandler): + return .cancelContinuation(eventProcessedHandler) + } + } } private var stateMachine: StateMachine = .init() @@ -268,4 +282,13 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { func finish() async throws { try self.stateMachine.finish() } + + func reportError(_ error: any Error) { + switch self.stateMachine.cancelProcessing() { + case .none: + break + case .cancelContinuation(let continuation): + continuation.resume(throwing: error) + } + } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift index e4235af1..d85d7f92 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaRunLoopTests.swift @@ -67,7 +67,7 @@ struct LambdaRunLoopTests { @Test func testRunLoopError() async throws { let inputEvent = ByteBuffer(string: "Test Invocation Event") - try await withThrowingTaskGroup(of: Void.self) { group in + await withThrowingTaskGroup(of: Void.self) { group in group.addTask { try await Lambda.runLoop( runtimeClient: self.mockClient, @@ -76,8 +76,12 @@ struct LambdaRunLoopTests { ) } - let response = try await self.mockClient.invoke(event: inputEvent) - #expect(String(buffer: response) == "\(LambdaError.handlerError)") + await #expect( + throws: LambdaError.handlerError, + performing: { + try await self.mockClient.invoke(event: inputEvent) + } + ) group.cancelAll() } From b2c783eb7c27d9d377990a51e047709b7abcf502 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Mon, 2 Sep 2024 11:14:21 +0100 Subject: [PATCH 09/12] Fix case name --- Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift index ca82877c..835e8223 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -104,10 +104,10 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { case fail(LambdaError) } - enum CancelProcessingAction { + enum FailProcessingAction { case none - case cancelContinuation(CheckedContinuation) + case throwContinuation(CheckedContinuation) } mutating func next(_ eventArrivedHandler: CheckedContinuation) -> NextAction { @@ -191,12 +191,12 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { } } - mutating func cancelProcessing() -> CancelProcessingAction { + mutating func cancelProcessing() -> FailProcessingAction { switch self.state { case .initialState, .waitingForNextEvent: return .none case .handlerIsProcessing(_, let eventProcessedHandler): - return .cancelContinuation(eventProcessedHandler) + return .throwContinuation(eventProcessedHandler) } } } @@ -287,7 +287,7 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { switch self.stateMachine.cancelProcessing() { case .none: break - case .cancelContinuation(let continuation): + case .throwContinuation(let continuation): continuation.resume(throwing: error) } } From 147628f731bb82e9f57af31d75f94535cc99f02b Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Mon, 2 Sep 2024 11:16:31 +0100 Subject: [PATCH 10/12] Fix function name --- Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift index 835e8223..f8ce321f 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -191,7 +191,7 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { } } - mutating func cancelProcessing() -> FailProcessingAction { + mutating func failProcessing() -> FailProcessingAction { switch self.state { case .initialState, .waitingForNextEvent: return .none @@ -284,7 +284,7 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { } func reportError(_ error: any Error) { - switch self.stateMachine.cancelProcessing() { + switch self.stateMachine.failProcessing() { case .none: break case .throwContinuation(let continuation): From 1d7983687d3b95f9d929e0dac9a8fffda06d8888 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Mon, 2 Sep 2024 11:49:09 +0100 Subject: [PATCH 11/12] Crash if reporting error when no event is being processed --- Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift index f8ce321f..a2396692 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -194,7 +194,8 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { mutating func failProcessing() -> FailProcessingAction { switch self.state { case .initialState, .waitingForNextEvent: - return .none + // Cannot report an error for an event if the event is not currently being processed. + fatalError() case .handlerIsProcessing(_, let eventProcessedHandler): return .throwContinuation(eventProcessedHandler) } From 43d78bd66bf95f6bfdd9f7ec977d570d4c86b0d5 Mon Sep 17 00:00:00 2001 From: Aryan Shah Date: Mon, 2 Sep 2024 11:59:25 +0100 Subject: [PATCH 12/12] Refactor --- Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift index a2396692..4023dc76 100644 --- a/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaMockClient.swift @@ -202,7 +202,7 @@ final actor LambdaMockClient: LambdaRuntimeClientProtocol { } } - private var stateMachine: StateMachine = .init() + private var stateMachine = StateMachine() struct Event { let invocation: Invocation