diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index fd92c639..70087a3a 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -490,11 +490,43 @@ extension Channel { mode: NIOHTTP2Handler.ParserMode, configuration: NIOHTTP2Handler.Configuration = .init(), streamInitializer: @escaping NIOChannelInitializerWithOutput + ) -> EventLoopFuture> { + self.configureAsyncHTTP2Pipeline( + mode: mode, + streamDelegate: nil, + configuration: configuration, + streamInitializer: streamInitializer + ) + } + + /// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams. + /// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code. + /// + /// - Parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - streamDelegate: A delegate which is called when streams are created and closed. + /// - configuration: The settings that will be used when establishing the connection and new streams. + /// - streamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer + /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can + /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. + @inlinable + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + streamDelegate: NIOHTTP2StreamDelegate?, + configuration: NIOHTTP2Handler.Configuration = NIOHTTP2Handler.Configuration(), + streamInitializer: @escaping NIOChannelInitializerWithOutput ) -> EventLoopFuture> { if self.eventLoop.inEventLoop { return self.eventLoop.makeCompletedFuture { return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( mode: mode, + streamDelegate: streamDelegate, configuration: configuration, streamInitializer: streamInitializer ) @@ -503,6 +535,7 @@ extension Channel { return self.eventLoop.submit { return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( mode: mode, + streamDelegate: streamDelegate, configuration: configuration, streamInitializer: streamInitializer ) @@ -510,6 +543,7 @@ extension Channel { } } + /// Configures a channel to perform an HTTP/2 secure upgrade with typed negotiation results. /// /// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to @@ -642,12 +676,46 @@ extension ChannelPipeline.SynchronousOperations { mode: NIOHTTP2Handler.ParserMode, configuration: NIOHTTP2Handler.Configuration = .init(), streamInitializer: @escaping NIOChannelInitializerWithOutput + ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer { + try self.configureAsyncHTTP2Pipeline( + mode: mode, + streamDelegate: nil, + configuration: configuration, + streamInitializer: streamInitializer + ) + } + + /// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code. + /// + /// This operation **must** be called on the event loop. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams, + /// as it allows that pipeline to evolve without breaking your code. + /// + /// - Parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - streamDelegate: A delegate which is called when streams are created and closed. + /// - configuration: The settings that will be used when establishing the connection and new streams. + /// - streamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// The output of this closure is the element type of the returned multiplexer + /// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can + /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. + @inlinable + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + streamDelegate: NIOHTTP2StreamDelegate?, + configuration: NIOHTTP2Handler.Configuration = NIOHTTP2Handler.Configuration(), + streamInitializer: @escaping NIOChannelInitializerWithOutput ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer { let handler = NIOHTTP2Handler( mode: mode, eventLoop: self.eventLoop, connectionConfiguration: configuration.connection, streamConfiguration: configuration.stream, + streamDelegate: streamDelegate, inboundStreamInitializerWithAnyOutput: { channel in streamInitializer(channel).map { return $0 } } diff --git a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift index e688b2f4..a0b4db14 100644 --- a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift @@ -466,6 +466,47 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase { } } + func testAsyncPipelineConfiguresStreamDelegate() async throws { + let clientRecorder = StreamRecorder() + let clientMultiplexer = try await self.clientChannel.configureAsyncHTTP2Pipeline( + mode: .client, + streamDelegate: clientRecorder + ) { channel in + channel.eventLoop.makeSucceededVoidFuture() + }.get() + + let serverRecorder = StreamRecorder() + _ = try await self.serverChannel.configureAsyncHTTP2Pipeline( + mode: .server, + streamDelegate: serverRecorder + ) { channel in + channel.pipeline.addHandler(OKResponder()) + }.get() + + try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel) + + for _ in 0 ..< 3 { + try await clientMultiplexer.openStream { stream in + return stream.pipeline.addHandlers(SimpleRequest()) + } + + try await Self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel) + try await Self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel) + } + + let expected: [StreamRecorder.Event] = [ + .init(streamID: 1, operation: .opened), .init(streamID: 1, operation: .closed), + .init(streamID: 3, operation: .opened), .init(streamID: 3, operation: .closed), + .init(streamID: 5, operation: .opened), .init(streamID: 5, operation: .closed), + ] + + XCTAssertEqual(clientRecorder.events, expected) + XCTAssertEqual(serverRecorder.events, expected) + + try await self.clientChannel.close() + try await self.serverChannel.close() + } + // Simple handler which maps server response parts to remove references to `IOData` which isn't Sendable internal final class HTTP1ServerSendability: ChannelOutboundHandler { public typealias ResponsePart = HTTPPart @@ -503,6 +544,40 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase { context.fireChannelRead(data) } } + + final class StreamRecorder: NIOHTTP2StreamDelegate, Sendable { + private let _events: NIOLockedValueBox<[Event]> + + struct Event: Sendable, Hashable { + var streamID: HTTP2StreamID + var operation: Operation + } + + enum Operation: Sendable, Hashable { + case opened + case closed + } + + var events: [Event] { + self._events.withLockedValue { $0 } + } + + init() { + self._events = NIOLockedValueBox([]) + } + + func streamCreated(_ id: HTTP2StreamID, channel: any Channel) { + self._events.withLockedValue { + $0.append(Event(streamID: id, operation: .opened)) + } + } + + func streamClosed(_ id: HTTP2StreamID, channel: any Channel) { + self._events.withLockedValue { + $0.append(Event(streamID: id, operation: .closed)) + } + } + } } #if compiler(<5.9)