Skip to content

Commit

Permalink
[FLINK-33750][config] Remove floating/exclusive buffer config options
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxnan committed Sep 20, 2024
1 parent 21c1360 commit ef40181
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,72 +149,6 @@ public enum CompressionCodec {
+ " and 1000 for batch workloads. If explicitly configured, the"
+ " configured value should be at least 1.");

/**
* Number of network buffers for each outgoing/incoming channel (subpartition/input channel).
* The minimum valid value for the option is 0. When the option is configured as 0, the
* exclusive network buffers used per downstream incoming channel will be 0, but for each
* upstream outgoing channel, max(1, configured value) will be used. In other words we ensure
* that, for performance reasons, at least one buffer is used per outgoing channel regardless of
* the configuration.
*
* @deprecated This option is deprecated in 1.20 and will be removed in 2.0 to simplify the
* configuration of network buffers.
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.intType()
.defaultValue(2)
.withDescription(
String.format(
"Number of exclusive network buffers for each outgoing/incoming"
+ " channel (subpartition/input channel) in the credit-based"
+ " flow control model. For the outgoing channel(subpartition),"
+ " this value is the effective exclusive buffers per channel."
+ " For the incoming channel(input channel), this value"
+ " is the max number of exclusive buffers per channel,"
+ " the number of effective exclusive network buffers per"
+ " channel is dynamically calculated from %s and the"
+ " effective range is from 0 to the configured value."
+ " The minimum valid value for the option is 0. When"
+ " the option is configured as 0, the exclusive network"
+ " buffers used by downstream incoming channel will be"
+ " 0, but for each upstream outgoing channel, max(1,"
+ " configured value) will be used. In other words, we"
+ " ensure that, for performance reasons, at least one"
+ " buffer is used per outgoing channel regardless of"
+ " the configuration.",
NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE.key()));

/**
* Number of floating network buffers for each outgoing/incoming gate (result partition/input
* gate).
*
* @deprecated This option is deprecated in 1.20 and will be removed in 2.0 to simplify the
* configuration of network buffers.
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.network.memory.floating-buffers-per-gate")
.intType()
.defaultValue(8)
.withDescription(
String.format(
"Number of floating network buffers for each outgoing/incoming"
+ " gate (result partition/input gate). In credit-based"
+ " flow control mode, this indicates how many floating"
+ " credits are shared among all the channels. The floating"
+ " buffers can help relieve back-pressure caused by"
+ " unbalanced data distribution among the subpartitions."
+ " For the outgoing gate(result partition), this value"
+ " is the effective floating buffers per gate. For the"
+ " incoming gate(input gate), this value is a recommended"
+ " number of floating buffers, the number of effective"
+ " floating network buffers per gate is dynamically"
+ " calculated from %s and the range of effective floating"
+ " buffers is from 0 to (parallelism - 1).",
NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE.key()));

/**
* Minimum number of network buffers required per blocking result partition for sort-shuffle.
*/
Expand Down Expand Up @@ -263,25 +197,6 @@ public enum CompressionCodec {
// this raw value must be changed correspondingly
"taskmanager.memory.framework.off-heap.batch-shuffle.size"));

/**
* Number of max buffers can be used for each output subpartition.
*
* @deprecated This option is deprecated in 1.20 and will be removed in 2.0 to simplify the
* configuration of network buffers.
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_MAX_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.max-buffers-per-channel")
.intType()
.defaultValue(10)
.withDescription(
"Number of max buffers that can be used for each channel. If a channel exceeds the number of max"
+ " buffers, it will make the task become unavailable, cause the back pressure and block the data processing. This"
+ " might speed up checkpoint alignment by preventing excessive growth of the buffered in-flight data in"
+ " case of data skew and high number of configured floating buffers. This limit is not strictly guaranteed,"
+ " and can be ignored by things like flatMap operators, records spanning multiple buffers or single timer"
+ " producing large amount of data.");

/** The timeout for requesting buffers for each channel. */
@Documentation.ExcludeFromDocumentation(
"This option is purely implementation related, and may be removed as the implementation changes.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ public class PipelinedSubpartition extends ResultSubpartition implements Channel

// ------------------------------------------------------------------------

/**
* Number of exclusive credits per input channel at the downstream tasks configured by {@link
* org.apache.flink.configuration.NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_PER_CHANNEL}.
*/
/** Number of exclusive credits per input channel at the downstream tasks. */
private final int receiverExclusiveBuffersPerChannel;

/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,8 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor>
public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
Configuration conf = shuffleMasterContext.getConfiguration();
checkNotNull(conf);
buffersPerInputChannel =
conf.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
floatingBuffersPerGate =
conf.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
buffersPerInputChannel = 2;
floatingBuffersPerGate = 8;
maxRequiredBuffersPerGate =
conf.getOptional(
NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE);
Expand All @@ -94,11 +92,6 @@ public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
"At least one buffer is required for each gate, please increase the value of %s.",
NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE
.key()));
checkArgument(
floatingBuffersPerGate >= 1,
String.format(
"The configured floating buffer should be at least 1, please increase the value of %s.",
NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.key()));
}

@Override
Expand Down Expand Up @@ -147,11 +140,9 @@ private static PartitionConnectionInfo createConnectionInfo(
/**
* JM announces network memory requirement from the calculating result of this method. Please
* note that the calculating algorithm depends on both I/O details of a vertex and network
* configuration, e.g. {@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_PER_CHANNEL} and
* {@link NettyShuffleEnvironmentOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}, which means we should
* always keep the consistency of configurations between JM, RM and TM in fine-grained resource
* management, thus to guarantee that the processes of memory announcing and allocating respect
* each other.
* configuration, which means we should always keep the consistency of configurations between
* JM, RM and TM in fine-grained resource management, thus to guarantee that the processes of
* memory announcing and allocating respect each other.
*/
@Override
public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor desc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,17 +304,14 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
.NETWORK_PARTITION_REQUEST_TIMEOUT)
.toMillis();

int buffersPerChannel =
configuration.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
int extraBuffersPerGate =
configuration.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
int buffersPerChannel = 2;
int extraBuffersPerGate = 8;

Optional<Integer> maxRequiredBuffersPerGate =
configuration.getOptional(
NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE);

int maxBuffersPerChannel =
configuration.get(NettyShuffleEnvironmentOptions.NETWORK_MAX_BUFFERS_PER_CHANNEL);
int maxBuffersPerChannel = 10;

long batchShuffleReadMemoryBytes =
configuration.get(TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY).getBytes();
Expand Down Expand Up @@ -349,18 +346,12 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
configuration.get(
NettyShuffleEnvironmentOptions.TCP_CONNECTION_REUSE_ACROSS_JOBS_ENABLED);

checkArgument(buffersPerChannel >= 0, "Must be non-negative.");
checkArgument(
!maxRequiredBuffersPerGate.isPresent() || maxRequiredBuffersPerGate.get() >= 1,
String.format(
"At least one buffer is required for each gate, please increase the value of %s.",
NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE
.key()));
checkArgument(
extraBuffersPerGate >= 1,
String.format(
"The configured floating buffer should be at least 1, please increase the value of %s.",
NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.key()));

TieredStorageConfiguration tieredStorageConfiguration = null;
if ((configuration.get(BATCH_SHUFFLE_MODE) == ALL_EXCHANGES_HYBRID_FULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.metrics.CharacterFilter;
Expand Down Expand Up @@ -104,8 +103,7 @@ static void shutdown() throws Exception {
void testRegisterTaskWithLimitedBuffers() throws Exception {
// outgoing: 1 buffer per channel + 1 extra buffer per ResultPartition
// incoming: 2 exclusive buffers per channel + 1 floating buffer per single gate
final int bufferCount =
18 + 10 * NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
final int bufferCount = 18 + 10 * 2;

testRegisterTaskWithLimitedBuffers(bufferCount);
}
Expand All @@ -118,12 +116,7 @@ void testRegisterTaskWithLimitedBuffers() throws Exception {
void testRegisterTaskWithInsufficientBuffers() throws Exception {
// outgoing: 1 buffer per channel + 1 extra buffer per ResultPartition
// incoming: 2 exclusive buffers per channel + 1 floating buffer per single gate
final int bufferCount =
10
+ 10
* NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL
.defaultValue()
- 1;
final int bufferCount = 10 + 10 * 2 - 1;

assertThatThrownBy(() -> testRegisterTaskWithLimitedBuffers(bufferCount))
.isInstanceOf(IOException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ void testNetworkBufferNumberCalculation() {
* Verifies that {@link NettyShuffleEnvironmentConfiguration#fromConfiguration(Configuration,
* MemorySize, boolean, InetAddress)} returns the correct result for new configurations via
* {@link NettyShuffleEnvironmentOptions#NETWORK_REQUEST_BACKOFF_INITIAL}, {@link
* NettyShuffleEnvironmentOptions#NETWORK_REQUEST_BACKOFF_MAX}, {@link
* NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_PER_CHANNEL} and {@link
* NettyShuffleEnvironmentOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}
* NettyShuffleEnvironmentOptions#NETWORK_REQUEST_BACKOFF_MAX}
*/
@Test
void testNetworkRequestBackoffAndBuffers() {
Expand All @@ -64,17 +62,13 @@ void testNetworkRequestBackoffAndBuffers() {
final Configuration config = new Configuration();
config.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
config.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
config.set(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
config.set(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);

final NettyShuffleEnvironmentConfiguration networkConfig =
NettyShuffleEnvironmentConfiguration.fromConfiguration(
config, MEM_SIZE_PARAM, true, InetAddress.getLoopbackAddress());

assertThat(networkConfig.partitionRequestInitialBackoff()).isEqualTo(100);
assertThat(networkConfig.partitionRequestMaxBackoff()).isEqualTo(200);
assertThat(networkConfig.networkBuffersPerChannel()).isEqualTo(10);
assertThat(networkConfig.floatingNetworkBuffersPerGate()).isEqualTo(100);
}

/** Verifies the correlation of sort-merge blocking shuffle config options. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.streaming.runtime.io.benchmark;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;

import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand Down Expand Up @@ -82,11 +80,7 @@ void remoteModeInsufficientBuffersSender() {
100,
false,
writers * channels - 1,
writers
* channels
* NettyShuffleEnvironmentOptions
.NETWORK_BUFFERS_PER_CHANNEL
.defaultValue()))
writers * channels * 2))
.isInstanceOf(IOException.class)
.hasMessageContaining("Insufficient number of network buffers");
}
Expand All @@ -105,12 +99,7 @@ void remoteModeInsufficientBuffersReceiver() throws Exception {
100,
false,
writers * channels,
writers
* channels
* NettyShuffleEnvironmentOptions
.NETWORK_BUFFERS_PER_CHANNEL
.defaultValue()
- 1))
writers * channels * 2 - 1))
.isInstanceOf(IOException.class)
.hasMessageContaining("Insufficient number of network buffers");
}
Expand All @@ -127,11 +116,7 @@ void remoteModeMinimumBuffers() throws Exception {
100,
false,
writers * channels + writers,
writers
+ writers
* channels
* NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL
.defaultValue());
writers + writers * channels * 2);
env.executeBenchmark(10_000);
env.tearDown();
}
Expand Down
Loading

0 comments on commit ef40181

Please sign in to comment.