Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark public generic async methods inlinable #426

Merged
merged 4 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ extension NIOHTTP2Handler {
/// Abstracts over the integrated stream multiplexing (inline) and the chained channel handler (legacy) multiplexing approaches.
///
/// We use an enum for this purpose since we can't use a generic (for API compatibility reasons) and it allows us to avoid the cost of using an existential.
@usableFromInline
internal enum InboundStreamMultiplexer: HTTP2InboundStreamMultiplexer {
case legacy(LegacyInboundStreamMultiplexer)
case inline(InlineStreamMultiplexer)
Expand Down Expand Up @@ -149,6 +150,7 @@ extension NIOHTTP2Handler.InboundStreamMultiplexer {
/// Provides an inbound stream multiplexer interface for legacy compatibility.
///
/// This doesn't actually do any demultiplexing of inbound streams but communicates with the `HTTP2StreamChannel` which does - mostly via user inbound events.
@usableFromInline
internal struct LegacyInboundStreamMultiplexer {
let context: ChannelHandlerContext
}
Expand Down
52 changes: 29 additions & 23 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@

import NIOCore

@usableFromInline
internal struct InlineStreamMultiplexer {
private let context: ChannelHandlerContext

private let commonStreamMultiplexer: HTTP2CommonInboundStreamMultiplexer
@usableFromInline
internal let _commonStreamMultiplexer: HTTP2CommonInboundStreamMultiplexer

private let outboundView: NIOHTTP2Handler.OutboundView

/// The delegate to be notified upon stream creation and close.
private var streamDelegate: NIOHTTP2StreamDelegate?

init(context: ChannelHandlerContext, outboundView: NIOHTTP2Handler.OutboundView, mode: NIOHTTP2Handler.ParserMode, inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer, targetWindowSize: Int, streamChannelOutboundBytesHighWatermark: Int, streamChannelOutboundBytesLowWatermark: Int, streamDelegate: NIOHTTP2StreamDelegate?) {
self.context = context
self.commonStreamMultiplexer = HTTP2CommonInboundStreamMultiplexer(
self._commonStreamMultiplexer = HTTP2CommonInboundStreamMultiplexer(
mode: mode,
channel: context.channel,
inboundStreamStateInitializer: inboundStreamStateInitializer,
Expand All @@ -40,22 +43,22 @@ internal struct InlineStreamMultiplexer {

extension InlineStreamMultiplexer: HTTP2InboundStreamMultiplexer {
func receivedFrame(_ frame: HTTP2Frame) {
self.commonStreamMultiplexer.receivedFrame(frame, context: self.context, multiplexer: .inline(self))
self._commonStreamMultiplexer.receivedFrame(frame, context: self.context, multiplexer: .inline(self))
}

func streamError(streamID: HTTP2StreamID, error: Error) {
let streamError = NIOHTTP2Errors.streamError(streamID: streamID, baseError: error)
self.commonStreamMultiplexer.streamError(context: self.context, streamError)
self._commonStreamMultiplexer.streamError(context: self.context, streamError)
}

func streamCreated(event: NIOHTTP2StreamCreatedEvent) {
if let childChannel = self.commonStreamMultiplexer.streamCreated(event: event) {
if let childChannel = self._commonStreamMultiplexer.streamCreated(event: event) {
self.streamDelegate?.streamCreated(event.streamID, channel: childChannel)
}
}

func streamClosed(event: StreamClosedEvent) {
if let childChannel = self.commonStreamMultiplexer.streamClosed(event: event) {
if let childChannel = self._commonStreamMultiplexer.streamClosed(event: event) {
self.streamDelegate?.streamClosed(event.streamID, channel: childChannel)
}
}
Expand All @@ -65,16 +68,16 @@ extension InlineStreamMultiplexer: HTTP2InboundStreamMultiplexer {
// This force-unwrap is safe: we always have a connection window.
self.newConnectionWindowSize(newSize: event.inboundWindowSize!)
} else {
self.commonStreamMultiplexer.childStreamWindowUpdated(event: event)
self._commonStreamMultiplexer.childStreamWindowUpdated(event: event)
}
}

func initialStreamWindowChanged(event: NIOHTTP2BulkStreamWindowChangeEvent) {
self.commonStreamMultiplexer.initialStreamWindowChanged(event: event)
self._commonStreamMultiplexer.initialStreamWindowChanged(event: event)
}

private func newConnectionWindowSize(newSize: Int) {
guard let increment = self.commonStreamMultiplexer.newConnectionWindowSize(newSize) else {
guard let increment = self._commonStreamMultiplexer.newConnectionWindowSize(newSize) else {
return
}

Expand All @@ -90,7 +93,7 @@ extension InlineStreamMultiplexer: HTTP2OutboundStreamMultiplexer {
}

func flushStream(_ id: HTTP2StreamID) {
switch self.commonStreamMultiplexer.flushDesired() {
switch self._commonStreamMultiplexer.flushDesired() {
case .proceed:
self.outboundView.flush(context: self.context)
case .waitForReadsToComplete:
Expand All @@ -99,33 +102,33 @@ extension InlineStreamMultiplexer: HTTP2OutboundStreamMultiplexer {
}

func requestStreamID(forChannel channel: NIOCore.Channel) -> HTTP2StreamID {
self.commonStreamMultiplexer.requestStreamID(forChannel: channel)
self._commonStreamMultiplexer.requestStreamID(forChannel: channel)
}

func streamClosed(channelID: ObjectIdentifier) {
self.commonStreamMultiplexer.childChannelClosed(channelID: channelID)
self._commonStreamMultiplexer.childChannelClosed(channelID: channelID)
}

func streamClosed(id: HTTP2StreamID) {
self.commonStreamMultiplexer.childChannelClosed(streamID: id)
self._commonStreamMultiplexer.childChannelClosed(streamID: id)
}
}

extension InlineStreamMultiplexer {
internal func propagateChannelActive() {
self.commonStreamMultiplexer.propagateChannelActive(context: self.context)
self._commonStreamMultiplexer.propagateChannelActive(context: self.context)
}

internal func propagateChannelInactive() {
self.commonStreamMultiplexer.propagateChannelInactive()
self._commonStreamMultiplexer.propagateChannelInactive()
}

internal func propagateChannelWritabilityChanged() {
self.commonStreamMultiplexer.propagateChannelWritabilityChanged(context: self.context)
self._commonStreamMultiplexer.propagateChannelWritabilityChanged(context: self.context)
}

internal func propagateReadComplete() {
switch self.commonStreamMultiplexer.propagateReadComplete() {
switch self._commonStreamMultiplexer.propagateReadComplete() {
case .flushNow:
// we had marked a flush as blocked by an active read which we may now perform
self.outboundView.flush(context: self.context)
Expand All @@ -135,21 +138,22 @@ extension InlineStreamMultiplexer {
}

internal func processedFrame(frame: HTTP2Frame) {
self.commonStreamMultiplexer.processedFrame(streamID: frame.streamID, size: frame.payload.flowControlledSize)
self._commonStreamMultiplexer.processedFrame(streamID: frame.streamID, size: frame.payload.flowControlledSize)
}
}

extension InlineStreamMultiplexer {
internal func createStreamChannel(promise: EventLoopPromise<Channel>?, _ streamStateInitializer: @escaping NIOChannelInitializer) {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), promise: promise, streamStateInitializer)
self._commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), promise: promise, streamStateInitializer)
}

internal func createStreamChannel(_ streamStateInitializer: @escaping NIOChannelInitializer) -> EventLoopFuture<Channel> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
self._commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), streamStateInitializer)
}

@inlinable
internal func createStreamChannel<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) -> EventLoopFuture<Output> {
self.commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer)
self._commonStreamMultiplexer.createStreamChannel(multiplexer: .inline(self), initializer)
}
}

Expand Down Expand Up @@ -208,7 +212,7 @@ extension NIOHTTP2Handler {

extension InlineStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any AnyContinuation) {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
self._commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

Expand All @@ -230,10 +234,11 @@ extension NIOHTTP2Handler {
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
private let inlineStreamMultiplexer: InlineStreamMultiplexer
@usableFromInline internal let inlineStreamMultiplexer: InlineStreamMultiplexer
public let inbound: NIOHTTP2AsyncSequence<InboundStreamOutput>

// Cannot be created by users.
@usableFromInline
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
Expand All @@ -245,6 +250,7 @@ extension NIOHTTP2Handler {
/// - Parameter initializer: A closure that will be called upon the created stream which is responsible for
/// initializing the stream's `Channel`.
/// - Returns: The result of the `initializer`.
@inlinable
public func openStream<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
}
Expand Down
21 changes: 13 additions & 8 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
private static let clientMagic: StaticString = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"

/// The event loop on which this handler will do work.
private let eventLoop: EventLoop?
@usableFromInline internal let _eventLoop: EventLoop?

/// The connection state machine. We always have one of these.
private var stateMachine: HTTP2ConnectionStateMachine
Expand Down Expand Up @@ -103,6 +103,8 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {

/// The delegate for (de)multiplexing inbound streams.
private var inboundStreamMultiplexerState: InboundStreamMultiplexerState

@usableFromInline
internal var inboundStreamMultiplexer: InboundStreamMultiplexer? {
return self.inboundStreamMultiplexerState.multiplexer
}
Expand Down Expand Up @@ -273,7 +275,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
maximumBufferedControlFrames: Int,
maximumResetFrameCount: Int,
resetFrameCounterWindow: TimeAmount) {
self.eventLoop = eventLoop
self._eventLoop = eventLoop
self.stateMachine = HTTP2ConnectionStateMachine(role: .init(mode), headerBlockValidation: .init(headerBlockValidation), contentLengthValidation: .init(contentLengthValidation))
self.mode = mode
self.initialSettings = initialSettings
Expand Down Expand Up @@ -312,7 +314,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
resetFrameCounterWindow: TimeAmount = .seconds(30)) {
self.stateMachine = HTTP2ConnectionStateMachine(role: .init(mode), headerBlockValidation: .init(headerBlockValidation), contentLengthValidation: .init(contentLengthValidation))
self.mode = mode
self.eventLoop = nil
self._eventLoop = nil
self.initialSettings = initialSettings
self.outboundBuffer = CompoundOutboundBuffer(mode: mode, initialMaxOutboundStreams: 100, maxBufferedControlFrames: maximumBufferedControlFrames)
self.denialOfServiceValidator = DOSHeuristics(maximumSequentialEmptyDataFrames: maximumSequentialEmptyDataFrames, maximumResetFrameCount: maximumResetFrameCount, resetFrameCounterWindow: resetFrameCounterWindow)
Expand Down Expand Up @@ -1036,6 +1038,7 @@ extension NIOHTTP2Handler {
/// The type of all `inboundStreamInitializer` callbacks which do not need to return data.
public typealias StreamInitializer = NIOChannelInitializer
/// The type of NIO Channel initializer callbacks which need to return untyped data.
@usableFromInline
internal typealias StreamInitializerWithAnyOutput = @Sendable (Channel) -> EventLoopFuture<any Sendable>

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer. (i.e. using
Expand Down Expand Up @@ -1073,6 +1076,7 @@ extension NIOHTTP2Handler {
self.inboundStreamMultiplexerState = .uninitializedInline(streamConfiguration, inboundStreamInitializer, streamDelegate)
}

@usableFromInline
internal convenience init(
mode: ParserMode,
eventLoop: EventLoop,
Expand Down Expand Up @@ -1154,12 +1158,12 @@ extension NIOHTTP2Handler {
/// i.e. it was initialized with an `inboundStreamInitializer`.
public var multiplexer: EventLoopFuture<StreamMultiplexer> {
// We need to return a future here so that we can synchronize access on the underlying `self.inboundStreamMultiplexer`
if self.eventLoop!.inEventLoop {
return self.eventLoop!.makeCompletedFuture {
if self._eventLoop!.inEventLoop {
return self._eventLoop!.makeCompletedFuture {
return try self.syncMultiplexer()
}
} else {
return self.eventLoop!.submit {
return self._eventLoop!.submit {
return try self.syncMultiplexer()
}
}
Expand All @@ -1171,7 +1175,7 @@ extension NIOHTTP2Handler {
/// > - The ``NIOHTTP2Handler`` uses a local multiplexer, i.e. it was initialized with an `inboundStreamInitializer`.
/// > - The caller is already on the correct event loop.
public func syncMultiplexer() throws -> StreamMultiplexer {
self.eventLoop!.preconditionInEventLoop()
self._eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
Expand All @@ -1181,9 +1185,10 @@ extension NIOHTTP2Handler {
}
}

@inlinable
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()
self._eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
Expand Down
Loading