Skip to content

Commit

Permalink
Decouple Skyframe worker thread stuff from repo fetching, part 1
Browse files Browse the repository at this point in the history
Today, the code we have for using worker threads for repo fetching is tightly coupled with repo fetching. This doesn't need to be the case. This CL refactors some code in repo fetching so that eventually we can reuse the worker thread stuff for other use cases (for example, module extension eval).

Most notably, `RepoFetchingSkyKeyComputeState` has a `recordedInputValues` field. This is conceptually an "output" of the repo fetching stage. By making `RepositoryFunction#fetch` _return_ a map of `recordedInputValues`, rather than taking such a map as an argument and writing into it, we can completely remove the `recordedInputValues` field from `RepoFetchingSkyKeyComputeState`.

This allows us to parameterize the return type of the worker thread future, and thus rename `RepoFetchingSkyKeyComputeState` to just `WorkerSkyKeyComputeState`, and `RepoFetchingWorkerSkyFunctionEnvironment` to just `WorkerSkyFunctionEnvironment`. At this point, these two classes already have nothing to do with repo fetching, and can be moved to a shareable location.

In a follow-up CL, I plan to refactor the code further such that some of the code interacting with these two classes in `StarlarkRepositoryFunction#fetch` can be moved into the two `Worker*` classes as well, so that the user of worker threads can enjoy a cleaner API.

Work towards #22729

PiperOrigin-RevId: 669024825
Change-Id: Ic1529a39ca81096812eff5e0fd74622b2ef0b68f
  • Loading branch information
Wyverald committed Sep 3, 2024
1 parent decd67e commit 6413f92
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.devtools.build.lib.bazel.ResolvedEvent;
import com.google.devtools.build.lib.packages.Rule;
import com.google.devtools.build.lib.packages.semantics.BuildLanguageOptions;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.lib.rules.repository.RepositoryFunction;
import com.google.devtools.build.lib.rules.repository.ResolvedFileValue;
Expand All @@ -31,7 +30,6 @@
import com.google.devtools.build.skyframe.SkyFunctionException.Transience;
import com.google.devtools.build.skyframe.SkyKey;
import java.io.IOException;
import java.util.Map;
import net.starlark.java.eval.StarlarkSemantics;

/** Create a local repository that describes the auto-detected host platform. */
Expand All @@ -48,13 +46,8 @@ public Class<? extends RuleDefinition> getRuleDefinition() {
}

@Override
public RepositoryDirectoryValue.Builder fetch(
Rule rule,
Path outputDirectory,
BlazeDirectories directories,
Environment env,
Map<RepoRecordedInput, String> recordedInputValues,
SkyKey key)
public FetchResult fetch(
Rule rule, Path outputDirectory, BlazeDirectories directories, Environment env, SkyKey key)
throws RepositoryFunctionException, InterruptedException {
ensureNativeRepoRuleEnabled(rule, env, "the platform defined at @platforms//host");
StarlarkSemantics starlarkSemantics = PrecomputedValue.STARLARK_SEMANTICS.get(env);
Expand Down Expand Up @@ -104,7 +97,8 @@ public Object getResolvedInformation(XattrProvider xattrProvider) {
});

// Return the needed info.
return RepositoryDirectoryValue.builder().setPath(outputDirectory);
return new FetchResult(
RepositoryDirectoryValue.builder().setPath(outputDirectory), ImmutableMap.of());
}

private static String workspaceFileContent(String repositoryName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
import com.google.devtools.build.skyframe.SkyFunctionException.Transience;
import com.google.devtools.build.skyframe.SkyKey;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -121,40 +123,31 @@ public void reportSkyframeRestart(Environment env, RepositoryName repoName) {
}

private record FetchArgs(
Rule rule,
Path outputDirectory,
BlazeDirectories directories,
Environment env,
Map<RepoRecordedInput, String> recordedInputValues,
SkyKey key) {
FetchArgs toWorkerArgs(Environment env, Map<RepoRecordedInput, String> recordedInputValues) {
return new FetchArgs(rule, outputDirectory, directories, env, recordedInputValues, key);
Rule rule, Path outputDirectory, BlazeDirectories directories, Environment env, SkyKey key) {
FetchArgs toWorkerArgs(Environment env) {
return new FetchArgs(rule, outputDirectory, directories, env, key);
}
}

@Nullable
@Override
public RepositoryDirectoryValue.Builder fetch(
Rule rule,
Path outputDirectory,
BlazeDirectories directories,
Environment env,
Map<RepoRecordedInput, String> recordedInputValues,
SkyKey key)
public FetchResult fetch(
Rule rule, Path outputDirectory, BlazeDirectories directories, Environment env, SkyKey key)
throws RepositoryFunctionException, InterruptedException {
var args = new FetchArgs(rule, outputDirectory, directories, env, recordedInputValues, key);
var args = new FetchArgs(rule, outputDirectory, directories, env, key);
if (!useWorkers) {
return fetchInternal(args);
}
// See below (the `catch CancellationException` clause) for why there's a `while` loop here.
while (true) {
var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName()));
ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture =
var state = env.getState(WorkerSkyKeyComputeState<FetchResult>::new);
ListenableFuture<FetchResult> workerFuture =
state.getOrStartWorker(
"starlark-repository-" + rule.getName(),
() -> {
Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state);
Environment workerEnv = new WorkerSkyFunctionEnvironment(state);
setupRepoRoot(outputDirectory);
return fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues));
return fetchInternal(args.toWorkerArgs(workerEnv));
});
try {
state.delegateEnvQueue.put(env);
Expand All @@ -164,9 +157,7 @@ public RepositoryDirectoryValue.Builder fetch(
// null to trigger a Skyframe restart, but *don't* shut down the worker executor.
return null;
}
RepositoryDirectoryValue.Builder result = workerFuture.get();
recordedInputValues.putAll(state.recordedInputValues);
return result;
return workerFuture.get();
} catch (ExecutionException e) {
Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
Throwables.throwIfUnchecked(e.getCause());
Expand Down Expand Up @@ -196,32 +187,22 @@ public RepositoryDirectoryValue.Builder fetch(
}

@Nullable
private RepositoryDirectoryValue.Builder fetchInternal(FetchArgs args)
private FetchResult fetchInternal(FetchArgs args)
throws RepositoryFunctionException, InterruptedException {
return fetchInternal(
args.rule,
args.outputDirectory,
args.directories,
args.env,
args.recordedInputValues,
args.key);
return fetchInternal(args.rule, args.outputDirectory, args.directories, args.env, args.key);
}

@Nullable
private RepositoryDirectoryValue.Builder fetchInternal(
Rule rule,
Path outputDirectory,
BlazeDirectories directories,
Environment env,
Map<RepoRecordedInput, String> recordedInputValues,
SkyKey key)
private FetchResult fetchInternal(
Rule rule, Path outputDirectory, BlazeDirectories directories, Environment env, SkyKey key)
throws RepositoryFunctionException, InterruptedException {

String defInfo = RepositoryResolvedEvent.getRuleDefinitionInformation(rule);
env.getListener().post(new StarlarkRepositoryDefinitionLocationEvent(rule.getName(), defInfo));

StarlarkCallable function = rule.getRuleClassObject().getConfiguredTargetFunction();
if (declareEnvironmentDependencies(recordedInputValues, env, getEnviron(rule)) == null) {
ImmutableMap<String, Optional<String>> envVarValues = getEnvVarValues(env, getEnviron(rule));
if (envVarValues == null) {
return null;
}
StarlarkSemantics starlarkSemantics = PrecomputedValue.STARLARK_SEMANTICS.get(env);
Expand Down Expand Up @@ -262,6 +243,7 @@ private RepositoryDirectoryValue.Builder fetchInternal(
}
ImmutableSet<PathFragment> ignoredPatterns = checkNotNull(ignoredPackagesValue).getPatterns();

Map<RepoRecordedInput, String> recordedInputValues = new LinkedHashMap<>();
try (Mutability mu = Mutability.create("Starlark repository");
StarlarkRepositoryContext starlarkRepositoryContext =
new StarlarkRepositoryContext(
Expand Down Expand Up @@ -332,6 +314,8 @@ private RepositoryDirectoryValue.Builder fetchInternal(

// Modify marker data to include the files/dirents/env vars used by the rule's implementation
// function.
recordedInputValues.putAll(
Maps.transformValues(RepoRecordedInput.EnvVar.wrap(envVarValues), v -> v.orElse(null)));
recordedInputValues.putAll(starlarkRepositoryContext.getRecordedFileInputs());
recordedInputValues.putAll(starlarkRepositoryContext.getRecordedDirentsInputs());
recordedInputValues.putAll(starlarkRepositoryContext.getRecordedDirTreeInputs());
Expand Down Expand Up @@ -395,7 +379,8 @@ private RepositoryDirectoryValue.Builder fetchInternal(
}
}

return RepositoryDirectoryValue.builder().setPath(outputDirectory);
return new FetchResult(
RepositoryDirectoryValue.builder().setPath(outputDirectory), recordedInputValues);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,24 @@
import javax.annotation.Nullable;

/**
* A {@link SkyFunction.Environment} implementation designed to be used in a different thread than
* the corresponding SkyFunction runs in. It relies on a delegate Environment object to do
* underlying work. Its {@link #getValue} and {@link #getValueOrThrow} methods do not return {@code
* null} when the {@link SkyValue} in question is not available. Instead, it blocks and waits for
* the host Skyframe thread to restart, and replaces the delegate Environment with a fresh one from
* the restarted SkyFunction before continuing. (Note that those methods <em>do</em> return {@code
* null} if the SkyValue was evaluated but found to be in error.)
* A {@link SkyFunction.Environment} implementation designed to be used in a different thread (the
* "worker thread") than the corresponding SkyFunction runs in. It relies on a delegate Environment
* object to do underlying work. Its {@link #getValue} and {@link #getValueOrThrow} methods do not
* return {@code null} when the {@link SkyValue} in question is not available. Instead, it blocks
* and waits for the host Skyframe thread to restart, and replaces the delegate Environment with a
* fresh one from the restarted SkyFunction before continuing. (Note that those methods <em>do</em>
* return {@code null} if the SkyValue was evaluated but found to be in error.)
*
* <p>Crucially, the delegate Environment object must not be used by multiple threads at the same
* time. In effect, this is guaranteed by only one of the worker thread and host thread being active
* at any given time.
*/
class RepoFetchingWorkerSkyFunctionEnvironment
class WorkerSkyFunctionEnvironment
implements SkyFunction.Environment, ExtendedEventHandler, SkyframeLookupResult {
private final RepoFetchingSkyKeyComputeState state;
private final WorkerSkyKeyComputeState<?> state;
private SkyFunction.Environment delegate;

RepoFetchingWorkerSkyFunctionEnvironment(RepoFetchingSkyKeyComputeState state)
throws InterruptedException {
WorkerSkyFunctionEnvironment(WorkerSkyKeyComputeState<?> state) throws InterruptedException {
this.state = state;
this.delegate = state.delegateEnvQueue.take();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
import com.google.devtools.build.skyframe.SkyFunction;
import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
Expand All @@ -34,15 +30,20 @@
import javax.annotation.concurrent.GuardedBy;

/**
* Captures state that persists across different invocations of {@link
* com.google.devtools.build.lib.rules.repository.RepositoryDelegatorFunction}, specifically {@link
* StarlarkRepositoryFunction}.
* A {@link SkyKeyComputeState} that manages a non-Skyframe virtual worker thread that persists
* across different invocations of a SkyFunction.
*
* <p>This class is used to hold on to a worker thread when fetching repos using a worker thread is
* enabled. The worker thread uses a {@link SkyFunction.Environment} object acquired from the host
* thread, and can signal the host thread to restart to get a fresh environment object.
* <p>The worker thread uses a {@link SkyFunction.Environment} object acquired from the host thread.
* When a new Skyframe dependency is needed, the worker thread itself does not need to restart;
* instead, it can signal the host thread to restart to get a fresh Environment object.
*
* <p>Similar to other implementations of {@link SkyKeyComputeState}, this avoids redoing expensive
* work when a new Skyframe dependency is needed; but because it holds on to an entire worker
* thread, this class is more suited to cases where the intermediate result of expensive work cannot
* be easily serialized (in particular, if there's an ongoing Starlark evaluation, as is the case in
* repo fetching).
*/
class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
class WorkerSkyKeyComputeState<T> implements SkyKeyComputeState {

/**
* A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
Expand All @@ -69,7 +70,7 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
*/
@GuardedBy("this")
@Nullable
private ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;
private ListenableFuture<T> workerFuture = null;

/** The executor service that manages the worker thread. */
// We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
Expand All @@ -78,20 +79,6 @@ class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
@Nullable
private ListeningExecutorService workerExecutorService = null;

private final String repoName;

/**
* This is where the recorded inputs & values for the whole invocation is collected.
*
* <p>{@link com.google.devtools.build.lib.rules.repository.RepositoryDelegatorFunction} creates a
* new map on each restart, so we can't simply plumb that in.
*/
final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();

RepoFetchingSkyKeyComputeState(String repoName) {
this.repoName = repoName;
}

/**
* Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
* back. This may only be called from the worker thread.
Expand All @@ -107,8 +94,7 @@ SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
* when the worker finishes, successfully or otherwise. This may only be called from the host
* Skyframe thread.
*/
synchronized ListenableFuture<RepositoryDirectoryValue.Builder> getOrStartWorker(
Callable<RepositoryDirectoryValue.Builder> c) {
synchronized ListenableFuture<T> getOrStartWorker(String workerThreadName, Callable<T> c) {
if (workerFuture != null) {
return workerFuture;
}
Expand All @@ -119,10 +105,9 @@ synchronized ListenableFuture<RepositoryDirectoryValue.Builder> getOrStartWorker
workerExecutorService =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
Thread.ofVirtual().name(workerThreadName).factory()));
signalSemaphore.drainPermits();
delegateEnvQueue.clear();
recordedInputValues.clear();

// Start the worker.
workerFuture = workerExecutorService.submit(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.devtools.build.lib.actions.FileValue;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.RuleDefinition;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import javax.annotation.Nullable;
import net.starlark.java.eval.EvalException;
Expand Down Expand Up @@ -193,17 +195,15 @@ public boolean verifyRecordedInputs(

@Override
@Nullable
public RepositoryDirectoryValue.Builder fetch(
public FetchResult fetch(
Rule rule,
final Path outputDirectory,
BlazeDirectories directories,
Environment env,
Map<RepoRecordedInput, String> recordedInputValues,
SkyKey key)
throws RepositoryFunctionException, InterruptedException {
ensureNativeRepoRuleEnabled(rule, env, "https://github.com/bazelbuild/rules_android");
Map<String, String> environ =
declareEnvironmentDependencies(recordedInputValues, env, PATH_ENV_VAR_AS_SET);
ImmutableMap<String, Optional<String>> environ = getEnvVarValues(env, PATH_ENV_VAR_AS_SET);
if (environ == null) {
return null;
}
Expand All @@ -216,19 +216,22 @@ public RepositoryDirectoryValue.Builder fetch(
WorkspaceAttributeMapper attributes = WorkspaceAttributeMapper.of(rule);
FileSystem fs = directories.getOutputBase().getFileSystem();
Path androidSdkPath;
String userDefinedPath = null;
String userDefinedPath;
if (attributes.isAttributeValueExplicitlySpecified("path")) {
userDefinedPath = getPathAttr(rule);
androidSdkPath = fs.getPath(getTargetPath(userDefinedPath, directories.getWorkspace()));
} else if (environ.get(PATH_ENV_VAR) != null) {
userDefinedPath = environ.get(PATH_ENV_VAR);
} else if (environ.getOrDefault(PATH_ENV_VAR, Optional.empty()).isPresent()) {
userDefinedPath = environ.get(PATH_ENV_VAR).get();
Path workspace = directories.getWorkspace();
androidSdkPath =
fs.getPath(getAndroidHomeEnvironmentVar(directories.getWorkspace(), environ));
fs.getPath(workspace.getRelative(PathFragment.create(userDefinedPath)).asFragment());
} else {
// Write an empty BUILD file that declares errors when referred to.
String buildFile = getStringResource("android_sdk_repository_empty_template.txt");
writeBuildFile(outputDirectory, buildFile);
return RepositoryDirectoryValue.builder().setPath(outputDirectory);
return new FetchResult(
RepositoryDirectoryValue.builder().setPath(outputDirectory),
Maps.transformValues(RepoRecordedInput.EnvVar.wrap(environ), v -> v.orElse(null)));
}

if (!symlinkLocalRepositoryContents(outputDirectory, androidSdkPath, userDefinedPath)) {
Expand Down Expand Up @@ -355,19 +358,16 @@ public RepositoryDirectoryValue.Builder fetch(
}

writeBuildFile(outputDirectory, buildFile);
return RepositoryDirectoryValue.builder().setPath(outputDirectory);
return new FetchResult(
RepositoryDirectoryValue.builder().setPath(outputDirectory),
Maps.transformValues(RepoRecordedInput.EnvVar.wrap(environ), v -> v.orElse(null)));
}

@Override
public Class<? extends RuleDefinition> getRuleDefinition() {
return AndroidSdkRepositoryRule.class;
}

private static PathFragment getAndroidHomeEnvironmentVar(
Path workspace, Map<String, String> env) {
return workspace.getRelative(PathFragment.create(env.get(PATH_ENV_VAR))).asFragment();
}

private static String getStringResource(String name) {
try {
return ResourceFileLoader.loadResource(AndroidSdkRepositoryFunction.class, name);
Expand Down
Loading

0 comments on commit 6413f92

Please sign in to comment.