Skip to content

Commit

Permalink
Add a variant of configureAsyncHTTP2Pipeline which takes a stream del…
Browse files Browse the repository at this point in the history
…egate (#439)

Motivation:

`configureAsyncHTTP2Pipeline` doesn't allow a stream delegate to be
specified. As the async pipeline uses the "inline" stream multiplexer
there's no way to account for streams within the connection channel.

Modifications:

- Add a sync and async variants of `configureAsyncHTTP2Pipeline` which
  accepts an optional stream delegate
- Rewrite the existing helpers in terms of the new one

Result:

Users can configure an async http pipeline with a stream delegate.
  • Loading branch information
glbrntt authored May 16, 2024
1 parent 356a3af commit 8d8eb60
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 0 deletions.
68 changes: 68 additions & 0 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,43 @@ extension Channel {
mode: NIOHTTP2Handler.ParserMode,
configuration: NIOHTTP2Handler.Configuration = .init(),
streamInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) -> EventLoopFuture<NIOHTTP2Handler.AsyncStreamMultiplexer<Output>> {
self.configureAsyncHTTP2Pipeline(
mode: mode,
streamDelegate: nil,
configuration: configuration,
streamInitializer: streamInitializer
)
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams.
/// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code.
///
/// - Parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - streamDelegate: A delegate which is called when streams are created and closed.
/// - configuration: The settings that will be used when establishing the connection and new streams.
/// - streamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// The output of this closure is the element type of the returned multiplexer
/// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can
/// be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
@inlinable
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public func configureAsyncHTTP2Pipeline<Output: Sendable>(
mode: NIOHTTP2Handler.ParserMode,
streamDelegate: NIOHTTP2StreamDelegate?,
configuration: NIOHTTP2Handler.Configuration = NIOHTTP2Handler.Configuration(),
streamInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) -> EventLoopFuture<NIOHTTP2Handler.AsyncStreamMultiplexer<Output>> {
if self.eventLoop.inEventLoop {
return self.eventLoop.makeCompletedFuture {
return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline(
mode: mode,
streamDelegate: streamDelegate,
configuration: configuration,
streamInitializer: streamInitializer
)
Expand All @@ -503,13 +535,15 @@ extension Channel {
return self.eventLoop.submit {
return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline(
mode: mode,
streamDelegate: streamDelegate,
configuration: configuration,
streamInitializer: streamInitializer
)
}
}
}


/// Configures a channel to perform an HTTP/2 secure upgrade with typed negotiation results.
///
/// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to
Expand Down Expand Up @@ -642,12 +676,46 @@ extension ChannelPipeline.SynchronousOperations {
mode: NIOHTTP2Handler.ParserMode,
configuration: NIOHTTP2Handler.Configuration = .init(),
streamInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<Output> {
try self.configureAsyncHTTP2Pipeline(
mode: mode,
streamDelegate: nil,
configuration: configuration,
streamInitializer: streamInitializer
)
}

/// Configures a `ChannelPipeline` to speak HTTP/2 and sets up mapping functions so that it may be interacted with from concurrent code.
///
/// This operation **must** be called on the event loop.
///
/// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation.
/// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge.
/// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams,
/// as it allows that pipeline to evolve without breaking your code.
///
/// - Parameters:
/// - mode: The mode this pipeline will operate in, server or client.
/// - streamDelegate: A delegate which is called when streams are created and closed.
/// - configuration: The settings that will be used when establishing the connection and new streams.
/// - streamInitializer: A closure that will be called whenever the remote peer initiates a new stream.
/// The output of this closure is the element type of the returned multiplexer
/// - Returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can
/// be used to initiate new streams and iterate over inbound HTTP/2 stream channels.
@inlinable
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public func configureAsyncHTTP2Pipeline<Output: Sendable>(
mode: NIOHTTP2Handler.ParserMode,
streamDelegate: NIOHTTP2StreamDelegate?,
configuration: NIOHTTP2Handler.Configuration = NIOHTTP2Handler.Configuration(),
streamInitializer: @escaping NIOChannelInitializerWithOutput<Output>
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<Output> {
let handler = NIOHTTP2Handler(
mode: mode,
eventLoop: self.eventLoop,
connectionConfiguration: configuration.connection,
streamConfiguration: configuration.stream,
streamDelegate: streamDelegate,
inboundStreamInitializerWithAnyOutput: { channel in
streamInitializer(channel).map { return $0 }
}
Expand Down
75 changes: 75 additions & 0 deletions Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,47 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase {
}
}

func testAsyncPipelineConfiguresStreamDelegate() async throws {
let clientRecorder = StreamRecorder()
let clientMultiplexer = try await self.clientChannel.configureAsyncHTTP2Pipeline(
mode: .client,
streamDelegate: clientRecorder
) { channel in
channel.eventLoop.makeSucceededVoidFuture()
}.get()

let serverRecorder = StreamRecorder()
_ = try await self.serverChannel.configureAsyncHTTP2Pipeline(
mode: .server,
streamDelegate: serverRecorder
) { channel in
channel.pipeline.addHandler(OKResponder())
}.get()

try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)

for _ in 0 ..< 3 {
try await clientMultiplexer.openStream { stream in
return stream.pipeline.addHandlers(SimpleRequest())
}

try await Self.deliverAllBytes(from: self.clientChannel, to: self.serverChannel)
try await Self.deliverAllBytes(from: self.serverChannel, to: self.clientChannel)
}

let expected: [StreamRecorder.Event] = [
.init(streamID: 1, operation: .opened), .init(streamID: 1, operation: .closed),
.init(streamID: 3, operation: .opened), .init(streamID: 3, operation: .closed),
.init(streamID: 5, operation: .opened), .init(streamID: 5, operation: .closed),
]

XCTAssertEqual(clientRecorder.events, expected)
XCTAssertEqual(serverRecorder.events, expected)

try await self.clientChannel.close()
try await self.serverChannel.close()
}

// Simple handler which maps server response parts to remove references to `IOData` which isn't Sendable
internal final class HTTP1ServerSendability: ChannelOutboundHandler {
public typealias ResponsePart = HTTPPart<HTTPResponseHead, ByteBuffer>
Expand Down Expand Up @@ -503,6 +544,40 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase {
context.fireChannelRead(data)
}
}

final class StreamRecorder: NIOHTTP2StreamDelegate, Sendable {
private let _events: NIOLockedValueBox<[Event]>

struct Event: Sendable, Hashable {
var streamID: HTTP2StreamID
var operation: Operation
}

enum Operation: Sendable, Hashable {
case opened
case closed
}

var events: [Event] {
self._events.withLockedValue { $0 }
}

init() {
self._events = NIOLockedValueBox([])
}

func streamCreated(_ id: HTTP2StreamID, channel: any Channel) {
self._events.withLockedValue {
$0.append(Event(streamID: id, operation: .opened))
}
}

func streamClosed(_ id: HTTP2StreamID, channel: any Channel) {
self._events.withLockedValue {
$0.append(Event(streamID: id, operation: .closed))
}
}
}
}

#if compiler(<5.9)
Expand Down

0 comments on commit 8d8eb60

Please sign in to comment.