Skip to content

Commit

Permalink
[VL] Gluten-it: In auto cluster mode, add option --off-heap-ratio f…
Browse files Browse the repository at this point in the history
…or adjusting memory shares of off-heap and on-heap (apache#7286)
  • Loading branch information
zhztheplayer authored Sep 20, 2024
1 parent 5bf3585 commit 766df6f
Showing 1 changed file with 34 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import picocli.CommandLine;

import javax.management.*;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public final class SparkRunModes {
Expand Down Expand Up @@ -211,19 +211,31 @@ private void ensureEnabled() {
}
}
}

private static class AutoClusterResource implements ClusterResource {
@CommandLine.Option(names = {"--auto-cluster-resource"}, description = "Local cluster mode: Automatically configure cluster resource", required = true)
private boolean enabled;

private final int lcWorkers;
private final int lcWorkerCores;
private final long lcWorkerHeapMem;
private final int lcExecutorCores;
private final long lcExecutorHeapMem;
private final long lcExecutorOffHeapMem;
@CommandLine.Option(names = {"--off-heap-ratio"}, description = "Local cluster mode: Ratio assigned to executor off-heap memory out of total executor memory. The rest of total will be assigned to executor on-heap memory", defaultValue = "0.67")
private double offHeapRatio;

private int lcWorkers;
private int lcWorkerCores;
private long lcWorkerHeapMem;
private int lcExecutorCores;
private long lcExecutorHeapMem;
private long lcExecutorOffHeapMem;

private final AtomicBoolean initialized = new AtomicBoolean(false);

public AutoClusterResource() {
}

private void ensureInitialized() {
if (!initialized.compareAndSet(false, true)) {
return;
}
Preconditions.checkArgument(offHeapRatio > 0 && offHeapRatio < 1, "Value of --off-heap-ratio should be in range (0, 1)");
final int totalCores = Runtime.getRuntime().availableProcessors();
final long totalMem = (long) (getTotalMem() * 0.8);
Preconditions.checkState(totalMem >= 64, "--auto-cluster-resource mode requires for at least 64 MiB physical memory available. Current: " + totalMem);
Expand All @@ -248,16 +260,16 @@ public AutoClusterResource() {
Preconditions.checkState(this.lcWorkerCores % this.lcExecutorCores == 0);
final int numExecutorsPerWorker = this.lcWorkerCores / this.lcExecutorCores;
final long executorMem = totalMem / numExecutors;
this.lcExecutorHeapMem = (long) (executorMem * 0.33);
this.lcExecutorOffHeapMem = (long) (executorMem * 0.67);
this.lcExecutorHeapMem = (long) (executorMem * (1 - offHeapRatio));
this.lcExecutorOffHeapMem = (long) (executorMem * offHeapRatio);
this.lcWorkerHeapMem = this.lcExecutorHeapMem * numExecutorsPerWorker;
System.out.printf("Automatically configured cluster resource settings: %n" +
" lcWorkers: [%d]%n" +
" lcWorkerCores: [%d]%n" +
" lcWorkerHeapMem: [%dMiB]%n" +
" lcExecutorCores: [%d]%n" +
" lcExecutorHeapMem: [%dMiB]%n" +
" lcExecutorOffHeapMem: [%dMiB]%n",
" lcWorkers: [%d]%n" +
" lcWorkerCores: [%d]%n" +
" lcWorkerHeapMem: [%dMiB]%n" +
" lcExecutorCores: [%d]%n" +
" lcExecutorHeapMem: [%dMiB]%n" +
" lcExecutorOffHeapMem: [%dMiB]%n",
lcWorkers,
lcWorkerCores,
lcWorkerHeapMem,
Expand All @@ -279,31 +291,37 @@ private static long getTotalMem() {

@Override
public int lcWorkers() {
ensureInitialized();
return lcWorkers;
}

@Override
public int lcWorkerCores() {
ensureInitialized();
return lcWorkerCores;
}

@Override
public long lcWorkerHeapMem() {
ensureInitialized();
return lcWorkerHeapMem;
}

@Override
public int lcExecutorCores() {
ensureInitialized();
return lcExecutorCores;
}

@Override
public long lcExecutorHeapMem() {
ensureInitialized();
return lcExecutorHeapMem;
}

@Override
public long lcExecutorOffHeapMem() {
ensureInitialized();
return lcExecutorOffHeapMem;
}
}
Expand Down

0 comments on commit 766df6f

Please sign in to comment.