Skip to content

Commit

Permalink
[FLINK-33750][config] Remove BoundedBlockingShuffle config options
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxnan committed Sep 20, 2024
1 parent ef40181 commit 80d5186
Show file tree
Hide file tree
Showing 8 changed files with 4 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,34 +169,6 @@ public enum CompressionCodec {
+ " number of network buffers' error if you are increasing this"
+ " config value.");

/**
* Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking
* shuffle.
*
* @deprecated The hash-based blocking shuffle is deprecated in 1.20 and will be totally removed
* in 2.0.
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_SORT_SHUFFLE_MIN_PARALLELISM =
key("taskmanager.network.sort-shuffle.min-parallelism")
.intType()
.defaultValue(1)
.withDescription(
String.format(
"Parallelism threshold to switch between sort-based blocking "
+ "shuffle and hash-based blocking shuffle, which means"
+ " for batch jobs of smaller parallelism, hash-shuffle"
+ " will be used and for batch jobs of larger or equal "
+ "parallelism, sort-shuffle will be used. The value 1 "
+ "means that sort-shuffle is the default option. Note:"
+ " For production usage, you may also need to tune "
+ "'%s' and '%s' for better performance.",
NETWORK_SORT_SHUFFLE_MIN_BUFFERS.key(),
// raw string key is used here to avoid interdependence, a test
// is implemented to guard that when the target key is modified,
// this raw value must be changed correspondingly
"taskmanager.memory.framework.off-heap.batch-shuffle.size"));

/** The timeout for requesting buffers for each channel. */
@Documentation.ExcludeFromDocumentation(
"This option is purely implementation related, and may be removed as the implementation changes.")
Expand Down Expand Up @@ -242,21 +214,6 @@ public enum CompressionCodec {
+ "remote storage when the disk space is not enough. "
+ "Note: If this option is not configured the remote storage will be disabled.");

/**
* @deprecated The hash-based blocking shuffle is deprecated in 1.20 and will be totally removed
* in 2.0.
*/
@Deprecated
public static final ConfigOption<String> NETWORK_BLOCKING_SHUFFLE_TYPE =
key("taskmanager.network.blocking-shuffle.type")
.stringType()
.defaultValue("file")
.withDescription(
"The blocking shuffle type, either \"mmap\" or \"file\". The \"auto\" means selecting the property type automatically"
+ " based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted"
+ " by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once"
+ " memory exceeding some threshold. Also note that this option is experimental and might be changed future.");

/**
* Whether to reuse tcp connections across multi jobs. If set to true, tcp connections will not
* be released after job finishes. The subsequent jobs will be free from the overhead of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
maxRequiredBuffersPerGate =
conf.getOptional(
NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE);
sortShuffleMinParallelism =
conf.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM);
sortShuffleMinParallelism = 1;
sortShuffleMinBuffers =
conf.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,7 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(

int sortShuffleMinBuffers =
configuration.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
int sortShuffleMinParallelism =
configuration.get(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM);
int sortShuffleMinParallelism = 1;

boolean isNetworkDetailedMetrics =
configuration.get(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
Expand All @@ -335,7 +333,7 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(
configuration.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_REQUEST_TIMEOUT);

BoundedBlockingSubpartitionType blockingSubpartitionType =
getBlockingSubpartitionType(configuration);
BoundedBlockingSubpartitionType.FILE;

CompressionCodec compressionCodec =
configuration.get(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);
Expand Down Expand Up @@ -466,20 +464,6 @@ private static NettyConfig createNettyConfig(
return nettyConfig;
}

private static BoundedBlockingSubpartitionType getBlockingSubpartitionType(
Configuration config) {
String transport = config.get(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE);

switch (transport) {
case "mmap":
return BoundedBlockingSubpartitionType.FILE_MMAP;
case "file":
return BoundedBlockingSubpartitionType.FILE;
default:
return BoundedBlockingSubpartitionType.AUTO;
}
}

// ------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.execution.Environment;
Expand Down Expand Up @@ -110,7 +109,6 @@ void testSequentialReading() throws Exception {
// if the netty server thread could not response in time, like when it is
// busy reading the files.
configuration.set(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, 100000);
configuration.set(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1g"));
configuration.set(
TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(bufferSize + "b"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.description.Formatter;
import org.apache.flink.configuration.description.HtmlFormatter;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -71,21 +69,6 @@ void testNetworkRequestBackoffAndBuffers() {
assertThat(networkConfig.partitionRequestMaxBackoff()).isEqualTo(200);
}

/** Verifies the correlation of sort-merge blocking shuffle config options. */
@Test
void testSortMergeShuffleConfigOptionsCorrelation() {
Formatter formatter = new HtmlFormatter();
ConfigOption<Integer> configOption =
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM;
String description = formatter.format(configOption.description());

String configKey =
getConfigKey(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
assertThat(description).contains(configKey);
configKey = getConfigKey(TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY);
assertThat(description).contains(configKey);
}

private static String getConfigKey(ConfigOption<?> configOption) {
return "'" + configOption.key() + "'";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void open(OpenContext openContext) throws Exception {
}

if (deletePartitionFile) {
synchronized (BlockingShuffleITCase.class) {
synchronized (BatchShuffleITCaseBase.class) {
deleteFiles(tmpDir.toFile());
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,6 @@ public static Boolean[] params() {
return new Boolean[] {true, false};
}

@Test
public void testNoDataCompressionForBoundedBlockingShuffle() throws Exception {
Configuration configuration = new Configuration();
configuration.set(
NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC,
NettyShuffleEnvironmentOptions.CompressionCodec.NONE);
configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1));
configuration.set(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);

JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, NUM_SLOTS);
}

@Test
public void testNoDataCompressionForSortMergeBlockingShuffle() throws Exception {
Configuration configuration = new Configuration();
Expand Down

0 comments on commit 80d5186

Please sign in to comment.