Skip to content

Commit

Permalink
Make it more flexible in terms of supporting GSO (java-native-access#222
Browse files Browse the repository at this point in the history
)
  • Loading branch information
normanmaurer authored Mar 18, 2021
1 parent 19625e5 commit 91c90cb
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public final class QuicChannelOption<T> extends ChannelOption<T> {

/**
* Use <a href="https://blog.cloudflare.com/accelerating-udp-packet-transmission-for-quic/">GSO</a>
* for QUIC packets if possible. If the number is bigger then 1 we will try to use segments.
* for QUIC packets if possible.
*/
public static final ChannelOption<Integer> UDP_SEGMENTS =
valueOf(QuicChannelOption.class, "QUIC_UDP_SEGMENTS");
public static final ChannelOption<SegmentedDatagramPacketAllocator> SEGMENTED_DATAGRAM_PACKET_ALLOCATOR =
valueOf(QuicChannelOption.class, "SEGMENTED_DATAGRAM_PACKET_ALLOCATOR");

@SuppressWarnings({ "deprecation" })
private QuicChannelOption() {
Expand Down
25 changes: 11 additions & 14 deletions src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import io.netty.channel.DefaultChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.SegmentedDatagramPacket;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.AttributeKey;
import io.netty.util.collection.LongObjectHashMap;
Expand Down Expand Up @@ -111,7 +109,6 @@ public void operationComplete(ChannelFuture future) {
private final Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray;
private final TimeoutHandler timeoutHandler = new TimeoutHandler();
private final InetSocketAddress remote;
private final boolean supportsUdpSegment;

private QuicheQuicConnection connection;
private boolean inFireChannelReadCompleteQueue;
Expand Down Expand Up @@ -153,7 +150,6 @@ private QuicheQuicChannel(Channel parent, boolean server, ByteBuffer key,
Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
super(parent);
this.supportsUdpSegment = SegmentedDatagramPacket.isSupported() && parent instanceof EpollDatagramChannel;
config = new QuicheQuicChannelConfig(this);
this.server = server;
this.idGenerator = new QuicStreamIdGenerator(server);
Expand Down Expand Up @@ -877,8 +873,8 @@ private boolean isConnDestroyed() {
return connection == null;
}

private boolean connectionSendSegments(int maxSegments) {
final int bufferSize = maxSegments * Quic.MAX_DATAGRAM_SIZE;
private boolean connectionSendSegments(SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
final int bufferSize = segmentedDatagramPacketAllocator.maxNumSegments() * Quic.MAX_DATAGRAM_SIZE;

long connAddr = connection.address();
boolean packetWasWritten = false;
Expand Down Expand Up @@ -908,7 +904,7 @@ private boolean connectionSendSegments(int maxSegments) {
int readable = out.readableBytes();
if (readable != 0) {
if (lastWritten != -1 && readable > lastWritten) {
parent().write(new SegmentedDatagramPacket(out, lastWritten, remote));
parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote));
} else {
parent().write(new DatagramPacket(out, remote));
}
Expand All @@ -923,7 +919,7 @@ private boolean connectionSendSegments(int maxSegments) {
// The write was smaller then the write before. This means we can write all together as the
// last segment can be smaller then the other segments.
out.writerIndex(writerIndex + written);
parent().write(new SegmentedDatagramPacket(out, lastWritten, remote));
parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote));
packetWasWritten = true;

out = alloc().directBuffer(bufferSize);
Expand All @@ -939,7 +935,7 @@ private boolean connectionSendSegments(int maxSegments) {
// As the last write was smaller then this write we first need to write what we had before as
// a segment can never be bigger then the previous segment. After this we will try to build a new
// chain of segments for the writes to follow.
parent().write(new SegmentedDatagramPacket(out, lastWritten, remote));
parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote));
packetWasWritten = true;

out = newOut;
Expand All @@ -953,9 +949,9 @@ private boolean connectionSendSegments(int maxSegments) {

// check if we either built the maximum number of segments for a write or if the ByteBuf is not writable
// anymore. In this case lets write what we have and start a new chain of segments.
if (numSegments == maxSegments ||
if (numSegments == segmentedDatagramPacketAllocator.maxNumSegments() ||
!out.isWritable()) {
parent().write(new SegmentedDatagramPacket(out, lastWritten, remote));
parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote));
packetWasWritten = true;

out = alloc().directBuffer(bufferSize);
Expand Down Expand Up @@ -1010,9 +1006,10 @@ private boolean connectionSend() {
inConnectionSend = true;
try {
boolean packetWasWritten;
int segments = supportsUdpSegment ? config.getUdpSegments() : 0;
if (segments > 0) {
packetWasWritten = connectionSendSegments(segments);
SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
config.getSegmentedDatagramPacketAllocator();
if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) {
packetWasWritten = connectionSendSegments(segmentedDatagramPacketAllocator);
} else {
packetWasWritten = connectionSendSimple();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
final class QuicheQuicChannelConfig extends DefaultChannelConfig implements QuicChannelConfig {

private volatile QLogConfiguration qLogConfiguration;
// Try to use UDP_SEGMENT by default if possible
private volatile int udpSegment = 10;
private volatile SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator =
SegmentedDatagramPacketAllocator.NONE;

QuicheQuicChannelConfig(Channel channel) {
super(channel);
Expand All @@ -41,7 +41,7 @@ final class QuicheQuicChannelConfig extends DefaultChannelConfig implements Quic
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(),
QuicChannelOption.QLOG, QuicChannelOption.UDP_SEGMENTS);
QuicChannelOption.QLOG, QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR);
}

@SuppressWarnings("unchecked")
Expand All @@ -50,8 +50,8 @@ public <T> T getOption(ChannelOption<T> option) {
if (option == QuicChannelOption.QLOG) {
return (T) getQLogConfiguration();
}
if (option == QuicChannelOption.UDP_SEGMENTS) {
return (T) Integer.valueOf(getUdpSegments());
if (option == QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR) {
return (T) getSegmentedDatagramPacketAllocator();
}
return super.getOption(option);
}
Expand All @@ -62,8 +62,8 @@ public <T> boolean setOption(ChannelOption<T> option, T value) {
setQLogConfiguration((QLogConfiguration) value);
return true;
}
if (option == QuicChannelOption.UDP_SEGMENTS) {
setUdpSegments((Integer) value);
if (option == QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR) {
setSegmentedDatagramPacketAllocator((SegmentedDatagramPacketAllocator) value);
return true;
}
return super.setOption(option, value);
Expand Down Expand Up @@ -147,11 +147,12 @@ private void setQLogConfiguration(QLogConfiguration qLogConfiguration) {
this.qLogConfiguration = qLogConfiguration;
}

int getUdpSegments() {
return udpSegment;
SegmentedDatagramPacketAllocator getSegmentedDatagramPacketAllocator() {
return segmentedDatagramPacketAllocator;
}

private void setUdpSegments(int udpSegment) {
this.udpSegment = udpSegment;
private void setSegmentedDatagramPacketAllocator(
SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
this.segmentedDatagramPacketAllocator = segmentedDatagramPacketAllocator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.incubator.codec.quic;

import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.DatagramPacket;

import java.net.InetSocketAddress;

/**
* Used to allocate datagram packets that use UDP_SEGMENT (GSO).
*/
public interface SegmentedDatagramPacketAllocator {

/**
* {@link SegmentedDatagramPacketAllocator} which should be used if no UDP_SEGMENT is supported and used.
*/
SegmentedDatagramPacketAllocator NONE = new SegmentedDatagramPacketAllocator() {
@Override
public int maxNumSegments() {
return 0;
}

@Override
public DatagramPacket newPacket(ByteBuf buffer, int segmentSize, InetSocketAddress remoteAddress) {
throw new UnsupportedOperationException();
}
};

/**
* The maximum number of segments to use per packet.
*
* @return the segments.
*/
int maxNumSegments();

/**
* Return a new segmented {@link DatagramPacket}.
*
* @param buffer the {@link ByteBuf} that is used as content.
* @param segmentSize the size of each segment.
* @param remoteAddress the remote address to send to.
* @return the packet.
*/
DatagramPacket newPacket(ByteBuf buffer, int segmentSize, InetSocketAddress remoteAddress);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.incubator.codec.quic;

import io.netty.buffer.ByteBuf;
import io.netty.channel.epoll.SegmentedDatagramPacket;
import io.netty.channel.socket.DatagramPacket;

import java.net.InetSocketAddress;

final class EpollSegmentedDatagramPacketAllocator implements SegmentedDatagramPacketAllocator {

private final int maxNumSegments;

EpollSegmentedDatagramPacketAllocator(int maxNumSegments) {
this.maxNumSegments = maxNumSegments;
}

@Override
public int maxNumSegments() {
return maxNumSegments;
}

@Override
public DatagramPacket newPacket(ByteBuf buffer, int segmentSize, InetSocketAddress remoteAddress) {
return new SegmentedDatagramPacket(buffer, segmentSize, remoteAddress);
}

static boolean isSupported() {
return SegmentedDatagramPacket.isSupported();
}
}
11 changes: 10 additions & 1 deletion src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
package io.netty.incubator.codec.quic;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.SegmentedDatagramPacket;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
Expand Down Expand Up @@ -98,7 +101,7 @@ static QuicServerCodecBuilder newQuicServerBuilder() {
}

static QuicServerCodecBuilder newQuicServerBuilder(QuicSslContext context) {
return new QuicServerCodecBuilder()
QuicServerCodecBuilder builder = new QuicServerCodecBuilder()
.sslEngineProvider(q -> context.newEngine(q.alloc()))
.maxIdleTimeout(5000, TimeUnit.MILLISECONDS)
.initialMaxData(10000000)
Expand All @@ -108,6 +111,11 @@ static QuicServerCodecBuilder newQuicServerBuilder(QuicSslContext context) {
.initialMaxStreamsBidirectional(100)
.initialMaxStreamsUnidirectional(100)
.activeMigration(false);
if (GROUP instanceof EpollEventLoopGroup && EpollSegmentedDatagramPacketAllocator.isSupported()) {
builder.option(QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR,
new EpollSegmentedDatagramPacketAllocator(10));
}
return builder;
}

private static Bootstrap newServerBootstrap(QuicServerCodecBuilder serverBuilder,
Expand Down Expand Up @@ -148,4 +156,5 @@ static void closeIfNotNull(Channel channel) throws Exception {
channel.close().sync();
}
}

}

0 comments on commit 91c90cb

Please sign in to comment.