Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Bazel more responsive and use less memory when --jobs is high #17120

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public ImmutableList<SpawnResult> exec(
spawnLogContext.logSpawn(
spawn,
actionExecutionContext.getMetadataProvider(),
context.getInputMapping(PathFragment.EMPTY_FRAGMENT),
context.getInputMapping(PathFragment.EMPTY_FRAGMENT,
/* willAccessRepeatedly = */ false),
context.getTimeout(),
spawnResult);
} catch (IOException | ForbiddenActionInputException e) {
Expand Down Expand Up @@ -246,7 +247,9 @@ public ListenableFuture<Void> prefetchInputs()
return actionExecutionContext
.getActionInputPrefetcher()
.prefetchFiles(
getInputMapping(PathFragment.EMPTY_FRAGMENT).values(), getMetadataProvider());
getInputMapping(PathFragment.EMPTY_FRAGMENT,
/* willAccessRepeatedly = */ true).values(),
getMetadataProvider());
}

return immediateVoidFuture();
Expand Down Expand Up @@ -306,22 +309,33 @@ public FileOutErr getFileOutErr() {
}

@Override
public SortedMap<PathFragment, ActionInput> getInputMapping(PathFragment baseDirectory)
public SortedMap<PathFragment, ActionInput> getInputMapping(PathFragment baseDirectory,
boolean willAccessRepeatedly)
throws IOException, ForbiddenActionInputException {
if (lazyInputMapping == null || !inputMappingBaseDirectory.equals(baseDirectory)) {
try (SilentCloseable c =
Profiler.instance().profile("AbstractSpawnStrategy.getInputMapping")) {
inputMappingBaseDirectory = baseDirectory;
lazyInputMapping =
spawnInputExpander.getInputMapping(
spawn,
actionExecutionContext.getArtifactExpander(),
baseDirectory,
actionExecutionContext.getMetadataProvider());
}
// Return previously computed copy if present.
if (lazyInputMapping != null && inputMappingBaseDirectory.equals(baseDirectory)) {
return lazyInputMapping;
}

SortedMap<PathFragment, ActionInput> inputMapping;
try (SilentCloseable c =
Profiler.instance().profile("AbstractSpawnStrategy.getInputMapping")) {
inputMapping =
spawnInputExpander.getInputMapping(
spawn,
actionExecutionContext.getArtifactExpander(),
baseDirectory,
actionExecutionContext.getMetadataProvider());
}

return lazyInputMapping;
// Don't cache the input mapping if it is unlikely that it is used again.
// This reduces memory usage in the case where remote caching/execution is
// used, and the expected cache hit rate is high.
if (willAccessRepeatedly) {
inputMappingBaseDirectory = baseDirectory;
lazyInputMapping = inputMapping;
}
return inputMapping;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void lockOutputFiles(int exitCode, String errorMessage, FileOutErr outErr)
* mapping is used in a context where the directory relative to which the keys are interpreted
* is not the same as the execroot.
*/
SortedMap<PathFragment, ActionInput> getInputMapping(PathFragment baseDirectory)
SortedMap<PathFragment, ActionInput> getInputMapping(PathFragment baseDirectory, boolean willAccessRepeatedly)
throws IOException, ForbiddenActionInputException;

/** Reports a progress update to the Spawn strategy. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import java.io.IOException;
import java.util.SortedMap;
import javax.annotation.Nullable;

/** A value class representing an action which can be executed remotely. */
public class RemoteAction {
Expand All @@ -36,7 +37,9 @@ public class RemoteAction {
private final SpawnExecutionContext spawnExecutionContext;
private final RemoteActionExecutionContext remoteActionExecutionContext;
private final RemotePathResolver remotePathResolver;
private final MerkleTree merkleTree;
@Nullable private final MerkleTree merkleTree;
private final long inputBytes;
private final long inputFiles;
private final Digest commandHash;
private final Command command;
private final Action action;
Expand All @@ -51,12 +54,15 @@ public class RemoteAction {
Digest commandHash,
Command command,
Action action,
ActionKey actionKey) {
ActionKey actionKey,
boolean remoteDiscardMerkleTrees) {
this.spawn = spawn;
this.spawnExecutionContext = spawnExecutionContext;
this.remoteActionExecutionContext = remoteActionExecutionContext;
this.remotePathResolver = remotePathResolver;
this.merkleTree = merkleTree;
this.merkleTree = remoteDiscardMerkleTrees ? null : merkleTree;
this.inputBytes = merkleTree.getInputBytes();
this.inputFiles = merkleTree.getInputFiles();
this.commandHash = commandHash;
this.command = command;
this.action = action;
Expand All @@ -80,12 +86,12 @@ public Spawn getSpawn() {
* Returns the sum of file sizes plus protobuf sizes used to represent the inputs of this action.
*/
public long getInputBytes() {
return merkleTree.getInputBytes();
return inputBytes;
}

/** Returns the number of input files of this action. */
public long getInputFiles() {
return merkleTree.getInputFiles();
return inputFiles;
}

/** Returns the id this is action. */
Expand All @@ -111,6 +117,7 @@ public Command getCommand() {
return command;
}

@Nullable
public MerkleTree getMerkleTree() {
return merkleTree;
}
Expand All @@ -119,9 +126,9 @@ public MerkleTree getMerkleTree() {
* Returns a {@link SortedMap} which maps from input paths for remote action to {@link
* ActionInput}.
*/
public SortedMap<PathFragment, ActionInput> getInputMap()
public SortedMap<PathFragment, ActionInput> getInputMap(boolean willAccessRepeatedly)
throws IOException, ForbiddenActionInputException {
return remotePathResolver.getInputMapping(spawnExecutionContext);
return remotePathResolver.getInputMapping(spawnExecutionContext, willAccessRepeatedly);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.IOException;
import java.lang.Runtime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -142,6 +143,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -377,7 +379,9 @@ private MerkleTree buildInputMerkleTree(
}
return MerkleTree.merge(subMerkleTrees, digestUtil);
} else {
SortedMap<PathFragment, ActionInput> inputMap = remotePathResolver.getInputMapping(context);
SortedMap<PathFragment, ActionInput> inputMap =
remotePathResolver.getInputMapping(
context, /* willAccessRepeatedly = */ !remoteOptions.remoteDiscardMerkleTrees);
if (!outputDirMap.isEmpty()) {
// The map returned by getInputMapping is mutable, but must not be mutated here as it is
// shared with all other strategies.
Expand Down Expand Up @@ -436,63 +440,90 @@ private static ByteString buildSalt(Spawn spawn) {
return null;
}

/**
* Semaphore for limiting the concurrent number of Merkle tree input roots we
* compute and keep in memory.
*
* When --jobs is set to a high value to let the remote execution service runs
* many actions in parallel, there is no point in letting the local system
* compute Merkle trees of input roots with the same amount of parallelism.
* Not only does this make Bazel feel sluggish and slow to respond to being
* interrupted, it causes it to exhaust memory.
*
* As there is no point in letting Merkle tree input root computation use a
* higher concurrency than the number of CPUs in the system, use a semaphore
* to limit the concurrency of buildRemoteAction().
*/
private final Semaphore remoteActionBuildingSemaphore =
new Semaphore(Runtime.getRuntime().availableProcessors(), true);

private ToolSignature getToolSignature(Spawn spawn, SpawnExecutionContext context)
throws IOException, ExecException, ForbiddenActionInputException, InterruptedException {
return remoteOptions.markToolInputs
&& Spawns.supportsWorkers(spawn)
&& !spawn.getToolFiles().isEmpty()
? computePersistentWorkerSignature(spawn, context)
: null;
}

/** Creates a new {@link RemoteAction} instance from spawn. */
public RemoteAction buildRemoteAction(Spawn spawn, SpawnExecutionContext context)
throws IOException, ExecException, ForbiddenActionInputException, InterruptedException {
ToolSignature toolSignature =
remoteOptions.markToolInputs
&& Spawns.supportsWorkers(spawn)
&& !spawn.getToolFiles().isEmpty()
? computePersistentWorkerSignature(spawn, context)
: null;
final MerkleTree merkleTree = buildInputMerkleTree(spawn, context, toolSignature);

// Get the remote platform properties.
Platform platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
if (toolSignature != null) {
platform =
PlatformUtils.getPlatformProto(
spawn, remoteOptions, ImmutableMap.of("persistentWorkerKey", toolSignature.key));
} else {
platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
}

Command command =
buildCommand(
spawn.getOutputFiles(),
spawn.getArguments(),
spawn.getEnvironment(),
platform,
remotePathResolver);
Digest commandHash = digestUtil.compute(command);
Action action =
Utils.buildAction(
commandHash,
merkleTree.getRootDigest(),
platform,
context.getTimeout(),
Spawns.mayBeCachedRemotely(spawn),
buildSalt(spawn));

ActionKey actionKey = digestUtil.computeActionKey(action);

RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(
buildRequestId, commandId, actionKey.getDigest().getHash(), spawn.getResourceOwner());
RemoteActionExecutionContext remoteActionExecutionContext =
RemoteActionExecutionContext.create(
spawn, metadata, getWriteCachePolicy(spawn), getReadCachePolicy(spawn));

return new RemoteAction(
spawn,
context,
remoteActionExecutionContext,
remotePathResolver,
merkleTree,
commandHash,
command,
action,
actionKey);
remoteActionBuildingSemaphore.acquire();
try {
ToolSignature toolSignature = getToolSignature(spawn, context);
final MerkleTree merkleTree = buildInputMerkleTree(spawn, context, toolSignature);

// Get the remote platform properties.
Platform platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
if (toolSignature != null) {
platform =
PlatformUtils.getPlatformProto(
spawn, remoteOptions, ImmutableMap.of("persistentWorkerKey", toolSignature.key));
} else {
platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
}

Command command =
buildCommand(
spawn.getOutputFiles(),
spawn.getArguments(),
spawn.getEnvironment(),
platform,
remotePathResolver);
Digest commandHash = digestUtil.compute(command);
Action action =
Utils.buildAction(
commandHash,
merkleTree.getRootDigest(),
platform,
context.getTimeout(),
Spawns.mayBeCachedRemotely(spawn),
buildSalt(spawn));

ActionKey actionKey = digestUtil.computeActionKey(action);

RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(
buildRequestId, commandId, actionKey.getDigest().getHash(), spawn.getResourceOwner());
RemoteActionExecutionContext remoteActionExecutionContext =
RemoteActionExecutionContext.create(
spawn, metadata, getWriteCachePolicy(spawn), getReadCachePolicy(spawn));

return new RemoteAction(
spawn,
context,
remoteActionExecutionContext,
remotePathResolver,
merkleTree,
commandHash,
command,
action,
actionKey,
remoteOptions.remoteDiscardMerkleTrees);
} finally {
remoteActionBuildingSemaphore.release();
}
}

@Nullable
Expand Down Expand Up @@ -1338,7 +1369,7 @@ private void reportUploadError(Throwable error) {
* <p>Must be called before calling {@link #executeRemotely}.
*/
public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
throws IOException, InterruptedException {
throws IOException, ExecException, ForbiddenActionInputException, InterruptedException {
checkState(!shutdown.get(), "shutdown");
checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");

Expand All @@ -1347,13 +1378,33 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
Map<Digest, Message> additionalInputs = Maps.newHashMapWithExpectedSize(2);
additionalInputs.put(action.getActionKey().getDigest(), action.getAction());
additionalInputs.put(action.getCommandHash(), action.getCommand());
remoteExecutionCache.ensureInputsPresent(
action
.getRemoteActionExecutionContext()
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
action.getMerkleTree(),
additionalInputs,
force);

// As uploading depends on having the full input root in memory, limit
// concurrency. This prevents memory exhaustion. We assume that
// ensureInputsPresent() provides enough parallelism to saturate the
// network connection.
remoteActionBuildingSemaphore.acquire();
try {
MerkleTree merkleTree = action.getMerkleTree();
if (merkleTree == null) {
// --experimental_remote_discard_merkle_trees was provided.
// Recompute the input root.
Spawn spawn = action.getSpawn();
SpawnExecutionContext context = action.getSpawnExecutionContext();
ToolSignature toolSignature = getToolSignature(spawn, context);
merkleTree = buildInputMerkleTree(spawn, context, toolSignature);
}

remoteExecutionCache.ensureInputsPresent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong because this makes a remote call, and this is now limited to N parallel remote calls. This should have been outside the semaphore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that this limits the number of merkle trees in flight preventing OOMs.

@EdSchouten was this the intention?

also cc @tjgq (and leaving a reference to #21378 here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm specifically talking about the ensureInputsPresent call, which performs remote calls, and is now limited.

Copy link
Contributor Author

@EdSchouten EdSchouten Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, it has to be limited to make sure we don't run OOM because memory is filled up with Merkle trees. Yes, this does mean that concurrency of outgoing network calls is limited as well.

One way to make this less problematic would be to implement buildInputMerkleTree() in such a way that it streams directories that it creates, and that FindMissingBlobs() is called in batches. But because buildInputMerkleTree() is not built like that, we currently don't have a lot of options here.

I guess I never observed this to be problematic for performance, because Buildbarn's FindMissingBlobs() implementation is sufficiently fast. It typically completes in ~2 milliseconds, even if 50k digests are provided.

Copy link
Contributor

@werkt werkt Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limiting this by the # of cores is nonsensical when concerned with memory exhaustion - I might as well limit by the number 42 - you're no more or less likely to create memory pressure with this based on the number of cores on a host.

We should limit by a heap pool size, charge by the weight of the merkle tree, and if it turns out that portions of that tree are shared across merkle trees (not sure if that optimization exists), then it should be the net weight of all unique merkle tree components.

Measuring this in milliseconds means that you're blocked on it - for a remote that responds in 1ms, the throughput cap is N * 1000, decided arbitrarily. Utilizing the network also means that there's a potential for async resource exhaustion there that will impact the runtime (but has nothing to do with remediating memory). The case where this was discovered was pathologically small input trees, --jobs=10000 with sub-second execution times, where bandwidth consumption, queued request processing on the channel, and the like could drastically increase the time in the critical section, through no fault of any remote service.

Copy link
Contributor Author

@EdSchouten EdSchouten Apr 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limiting it by the number of cores is a better heuristic than limiting it by the --jobs flag. The reason being that the amount of RAM a system has tends to be proportional to the number of cores. The RAM on my system is not necessarily proportional to the number of worker threads my build cluster running on {AWS,GCP,...} has.

I'm fine with whatever limit it is. As long as there is a sane one. Ulf mentioned that this code should not have been covered by a semaphore. This is something I object to, give the fact that the Merkle tree computation code is not written in such a way that it can return partial Merkle trees.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am working on async execution (running actions in virtual thread, check --experimental_async_execution in HEAD) which might allow us to remove this limiting because the size of the underlying thread pool equals to the number of cores.

action
.getRemoteActionExecutionContext()
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
merkleTree,
additionalInputs,
force);
} finally {
remoteActionBuildingSemaphore.release();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void close() {}

private void checkForConcurrentModifications()
throws IOException, ForbiddenActionInputException {
for (ActionInput input : action.getInputMap().values()) {
for (ActionInput input : action.getInputMap(true).values()) {
if (input instanceof VirtualActionInput) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,9 @@ private Map<Path, Long> getInputCtimes(SortedMap<PathFragment, ActionInput> inpu
SpawnResult execLocallyAndUpload(
RemoteAction action, Spawn spawn, SpawnExecutionContext context, boolean uploadLocalResults)
throws ExecException, IOException, ForbiddenActionInputException, InterruptedException {
Map<Path, Long> ctimesBefore = getInputCtimes(action.getInputMap());
Map<Path, Long> ctimesBefore = getInputCtimes(action.getInputMap(true));
SpawnResult result = execLocally(spawn, context);
Map<Path, Long> ctimesAfter = getInputCtimes(action.getInputMap());
Map<Path, Long> ctimesAfter = getInputCtimes(action.getInputMap(true));
uploadLocalResults =
uploadLocalResults && Status.SUCCESS.equals(result.status()) && result.exitCode() == 0;
if (!uploadLocalResults) {
Expand Down
Loading