diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/BUILD b/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/BUILD index 69cee4b2f4a084..f3ed273ba387a1 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/BUILD +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/BUILD @@ -23,6 +23,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/clock", "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/events", + "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/util", "//src/main/java/com/google/devtools/build/lib/util:os", diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/DownloadManager.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/DownloadManager.java index 02656d8221f3e8..43853fdfdd6701 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/DownloadManager.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/DownloadManager.java @@ -20,9 +20,11 @@ import com.google.auth.Credentials; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.authandtls.StaticCredentials; import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache; import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache.KeyType; @@ -30,6 +32,8 @@ import com.google.devtools.build.lib.bazel.repository.downloader.UrlRewriter.RewrittenURL; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.ExtendedEventHandler; +import com.google.devtools.build.lib.profiler.Profiler; +import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; @@ -42,6 +46,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import javax.annotation.Nullable; /** @@ -51,6 +59,16 @@ * to disk. */ public class DownloadManager { + private static final ExecutorService DOWNLOAD_EXECUTOR = + Executors.newFixedThreadPool( + // There is also GrpcRemoteDownloader so if we set the thread pool to the same size as + // the allowed number of HTTP downloads, it might unnecessarily block. No, this is not a + // very + // principled approach; ideally, we'd grow the thread pool as needed with some generous + // upper + // limit. + 2 * HttpDownloader.MAX_PARALLEL_DOWNLOADS, + new ThreadFactoryBuilder().setNameFormat("download-manager-%d").build()); private final RepositoryCache repositoryCache; private List distdir = ImmutableList.of(); @@ -96,6 +114,69 @@ public void setCredentialFactory(CredentialFactory credentialFactory) { this.credentialFactory = credentialFactory; } + public Future startDownload( + List originalUrls, + Map>> authHeaders, + Optional checksum, + String canonicalId, + Optional type, + Path output, + ExtendedEventHandler eventHandler, + Map clientEnv, + String context) { + return DOWNLOAD_EXECUTOR.submit( + () -> { + try (SilentCloseable c = Profiler.instance().profile("fetching: " + context)) { + return downloadInExecutor( + originalUrls, + authHeaders, + checksum, + canonicalId, + type, + output, + eventHandler, + clientEnv, + context); + } + }); + } + + public Path finalizeDownload(Future download) throws IOException, InterruptedException { + try { + return download.get(); + } catch (ExecutionException e) { + Throwables.throwIfInstanceOf(e.getCause(), IOException.class); + Throwables.throwIfInstanceOf(e.getCause(), InterruptedException.class); + Throwables.throwIfUnchecked(e.getCause()); + throw new IllegalStateException(e); + } + } + + public Path download( + List originalUrls, + Map>> authHeaders, + Optional checksum, + String canonicalId, + Optional type, + Path output, + ExtendedEventHandler eventHandler, + Map clientEnv, + String context) + throws IOException, InterruptedException { + Future future = + startDownload( + originalUrls, + authHeaders, + checksum, + canonicalId, + type, + output, + eventHandler, + clientEnv, + context); + return finalizeDownload(future); + } + /** * Downloads file to disk and returns path. * @@ -114,7 +195,7 @@ public void setCredentialFactory(CredentialFactory credentialFactory) { * @throws IOException if download was attempted and ended up failing * @throws InterruptedException if this thread is being cast into oblivion */ - public Path download( + private Path downloadInExecutor( List originalUrls, Map>> authHeaders, Optional checksum, diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/HttpDownloader.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/HttpDownloader.java index 1cc14c1dd183c8..3e9e0f150a6ac8 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/HttpDownloader.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/downloader/HttpDownloader.java @@ -47,7 +47,8 @@ * file to disk. */ public class HttpDownloader implements Downloader { - private static final int MAX_PARALLEL_DOWNLOADS = 8; + static final int MAX_PARALLEL_DOWNLOADS = 8; + private static final Semaphore SEMAPHORE = new Semaphore(MAX_PARALLEL_DOWNLOADS, true); private static final Clock CLOCK = new JavaClock(); private static final Sleeper SLEEPER = new JavaSleeper(); diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkBaseExternalContext.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkBaseExternalContext.java index f34fb44603f63d..1374306b6be39c 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkBaseExternalContext.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkBaseExternalContext.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Futures; import com.google.devtools.build.lib.actions.FileValue; import com.google.devtools.build.lib.bazel.debug.WorkspaceRuleEvent; import com.google.devtools.build.lib.bazel.repository.DecompressorDescriptor; @@ -34,6 +35,8 @@ import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager; import com.google.devtools.build.lib.bazel.repository.downloader.HttpUtils; import com.google.devtools.build.lib.cmdline.Label; +import com.google.devtools.build.lib.events.Event; +import com.google.devtools.build.lib.events.EventHandler; import com.google.devtools.build.lib.events.ExtendedEventHandler.FetchProgress; import com.google.devtools.build.lib.packages.StarlarkInfo; import com.google.devtools.build.lib.packages.StructImpl; @@ -77,12 +80,16 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import javax.annotation.Nullable; import net.starlark.java.annot.Param; import net.starlark.java.annot.ParamType; import net.starlark.java.annot.StarlarkMethod; import net.starlark.java.eval.Dict; import net.starlark.java.eval.EvalException; +import net.starlark.java.eval.Printer; import net.starlark.java.eval.Sequence; import net.starlark.java.eval.Starlark; import net.starlark.java.eval.StarlarkInt; @@ -93,6 +100,31 @@ /** A common base class for Starlark "ctx" objects related to external dependencies. */ public abstract class StarlarkBaseExternalContext implements StarlarkValue { + + /** + * An asynchronous task run as part of fetching the repository. + * + *

The main property of such tasks is that they should under no circumstances keep running + * after fetching the repository is finished, whether successfully or not. To this end, the {@link + * #cancel()} method must stop all such work. + */ + private interface AsyncTask { + /** Returns a user-friendly description of the task. */ + String getDescription(); + + /** Returns where the task was started from. */ + Location getLocation(); + + /** + * Cancels the task, if not done yet. Returns false if the task was still in progress. + * + *

No means of error reporting is provided. Any errors should be reported by other means. The + * only possible error reported as a consequence of calling this method is one that tells the + * user that they didn't wait for an async task they should have waited for. + */ + boolean cancel(); + } + /** Max. number of command line args added as a profiler description. */ private static final int MAX_PROFILE_ARGS_LEN = 80; @@ -106,6 +138,7 @@ public abstract class StarlarkBaseExternalContext implements StarlarkValue { protected final StarlarkSemantics starlarkSemantics; private final HashMap accumulatedFileDigests = new HashMap<>(); private final RepositoryRemoteExecutor remoteExecutor; + private final List asyncTasks; protected StarlarkBaseExternalContext( Path workingDirectory, @@ -125,6 +158,31 @@ protected StarlarkBaseExternalContext( this.processWrapper = processWrapper; this.starlarkSemantics = starlarkSemantics; this.remoteExecutor = remoteExecutor; + this.asyncTasks = new ArrayList<>(); + } + + public boolean ensureNoPendingAsyncTasks(EventHandler eventHandler, boolean forSuccessfulFetch) { + boolean hadPendingItems = false; + for (AsyncTask task : asyncTasks) { + if (!task.cancel()) { + hadPendingItems = true; + if (forSuccessfulFetch) { + eventHandler.handle( + Event.error( + task.getLocation(), + "Work pending after repository rule finished execution: " + + task.getDescription())); + } + } + } + + return hadPendingItems; + } + + // There is no unregister(). We don't have that many futures in each repository and it just + // introduces the failure mode of erroneously unregistering async work that's not done. + protected void registerAsyncTask(AsyncTask task) { + asyncTasks.add(task); } /** A string that can be used to identify this context object. Used for logging purposes. */ @@ -363,6 +421,101 @@ private StructImpl calculateDownloadResult(Optional checksum, Path dow return StarlarkInfo.create(StructProvider.STRUCT, out.buildOrThrow(), Location.BUILTIN); } + private class PendingDownload implements StarlarkValue, AsyncTask { + private final boolean executable; + private final boolean allowFail; + private final StarlarkPath outputPath; + private final Optional checksum; + private final RepositoryFunctionException checksumValidation; + private final Future future; + private final Location location; + + private PendingDownload( + boolean executable, + boolean allowFail, + StarlarkPath outputPath, + Optional checksum, + RepositoryFunctionException checksumValidation, + Future future, + Location location) { + this.executable = executable; + this.allowFail = allowFail; + this.outputPath = outputPath; + this.checksum = checksum; + this.checksumValidation = checksumValidation; + this.future = future; + this.location = location; + } + + @Override + public String getDescription() { + return String.format("downloading to '%s'", outputPath); + } + + @Override + public Location getLocation() { + return location; + } + + @Override + public boolean cancel() { + if (!future.cancel(true)) { + return true; + } + + try { + future.get(); + return false; + } catch (InterruptedException | ExecutionException | CancellationException e) { + // Ignore. The only thing we care about is that there is no async work in progress after + // this point. Any error reporting should have been done before. + return false; + } + } + + @StarlarkMethod( + name = "wait", + doc = + "Blocks until the completion of the download and returns or throws as blocking " + + " download() call would") + public StructImpl await() throws InterruptedException, RepositoryFunctionException { + return completeDownload(this); + } + + @Override + public void repr(Printer printer) { + printer.append(String.format("", outputPath)); + } + } + + private StructImpl completeDownload(PendingDownload pendingDownload) + throws RepositoryFunctionException, InterruptedException { + Path downloadedPath; + try { + downloadedPath = downloadManager.finalizeDownload(pendingDownload.future); + if (pendingDownload.executable) { + pendingDownload.outputPath.getPath().setExecutable(true); + } + } catch (IOException e) { + if (pendingDownload.allowFail) { + return StarlarkInfo.create( + StructProvider.STRUCT, ImmutableMap.of("success", false), Location.BUILTIN); + } else { + throw new RepositoryFunctionException(e, Transience.TRANSIENT); + } + } catch (InvalidPathException e) { + throw new RepositoryFunctionException( + Starlark.errorf( + "Could not create output path %s: %s", pendingDownload.outputPath, e.getMessage()), + Transience.PERSISTENT); + } + if (pendingDownload.checksumValidation != null) { + throw pendingDownload.checksumValidation; + } + + return calculateDownloadResult(pendingDownload.checksum, downloadedPath); + } + @StarlarkMethod( name = "download", doc = @@ -435,8 +588,18 @@ private StructImpl calculateDownloadResult(Optional checksum, Path dow + " risk to omit the checksum as remote files can change. At best omitting this" + " field will make your build non-hermetic. It is optional to make development" + " easier but should be set before shipping."), + @Param( + name = "block", + defaultValue = "True", + named = true, + positional = false, + doc = + "If set to false, the call returns immediately and instead of the regular return" + + " value, it returns a token with one single method, wait(), which blocks" + + " until the download is finished and returns the usual return value or" + + " throws as usual.") }) - public StructImpl download( + public Object download( Object url, Object output, String sha256, @@ -445,17 +608,20 @@ public StructImpl download( String canonicalId, Dict authUnchecked, // expected String integrity, + Boolean block, StarlarkThread thread) throws RepositoryFunctionException, EvalException, InterruptedException { + PendingDownload download = null; ImmutableMap>> authHeaders = getAuthHeaders(getAuthContents(authUnchecked, "auth")); ImmutableList urls = getUrls( url, - /*ensureNonEmpty=*/ !allowFail, - /*checksumGiven=*/ !Strings.isNullOrEmpty(sha256) || !Strings.isNullOrEmpty(integrity)); - Optional checksum; + /* ensureNonEmpty= */ !allowFail, + /* checksumGiven= */ !Strings.isNullOrEmpty(sha256) + || !Strings.isNullOrEmpty(integrity)); + Optional checksum = null; RepositoryFunctionException checksumValidation = null; try { checksum = validateChecksum(sha256, integrity, urls); @@ -475,13 +641,24 @@ public StructImpl download( getIdentifyingStringForLogging(), thread.getCallerLocation()); env.getListener().post(w); - Path downloadedPath; - try (SilentCloseable c = - Profiler.instance().profile("fetching: " + getIdentifyingStringForLogging())) { + + try { checkInOutputDirectory("write", outputPath); makeDirectories(outputPath.getPath()); - downloadedPath = - downloadManager.download( + } catch (IOException e) { + download = + new PendingDownload( + executable, + allowFail, + outputPath, + checksum, + checksumValidation, + Futures.immediateFailedFuture(e), + thread.getCallerLocation()); + } + if (download == null) { + Future downloadFuture = + downloadManager.startDownload( urls, authHeaders, checksum, @@ -491,26 +668,22 @@ public StructImpl download( env.getListener(), envVariables, getIdentifyingStringForLogging()); - if (executable) { - outputPath.getPath().setExecutable(true); - } - } catch (IOException e) { - if (allowFail) { - return StarlarkInfo.create( - StructProvider.STRUCT, ImmutableMap.of("success", false), Location.BUILTIN); - } else { - throw new RepositoryFunctionException(e, Transience.TRANSIENT); - } - } catch (InvalidPathException e) { - throw new RepositoryFunctionException( - Starlark.errorf("Could not create output path %s: %s", outputPath, e.getMessage()), - Transience.PERSISTENT); + download = + new PendingDownload( + executable, + allowFail, + outputPath, + checksum, + checksumValidation, + downloadFuture, + thread.getCallerLocation()); + registerAsyncTask(download); } - if (checksumValidation != null) { - throw checksumValidation; + if (!block) { + return download; + } else { + return completeDownload(download); } - - return calculateDownloadResult(checksum, downloadedPath); } @StarlarkMethod( @@ -669,17 +842,15 @@ public StructImpl downloadAndExtract( Path downloadedPath; Path downloadDirectory; - try (SilentCloseable c = - Profiler.instance().profile("fetching: " + getIdentifyingStringForLogging())) { - + try { // Download to temp directory inside the outputDirectory and delete it after extraction java.nio.file.Path tempDirectory = Files.createTempDirectory(Paths.get(outputPath.toString()), "temp"); downloadDirectory = workingDirectory.getFileSystem().getPath(tempDirectory.toFile().getAbsolutePath()); - downloadedPath = - downloadManager.download( + Future pendingDownload = + downloadManager.startDownload( urls, authHeaders, checksum, @@ -689,6 +860,7 @@ public StructImpl downloadAndExtract( env.getListener(), envVariables, getIdentifyingStringForLogging()); + downloadedPath = downloadManager.finalizeDownload(pendingDownload); } catch (IOException e) { env.getListener().post(w); if (allowFail) { diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java index b3aabda85f2157..db616741fbdbbc 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java @@ -282,6 +282,7 @@ private RepositoryDirectoryValue.Builder fetchInternal( // it possible to return null and not block but it doesn't seem to be easy with Starlark // structure as it is. Object result; + boolean fetchSuccessful = false; try (SilentCloseable c = Profiler.instance() .profile(ProfilerTask.STARLARK_REPOSITORY_FN, () -> rule.getLabel().toString())) { @@ -291,7 +292,19 @@ private RepositoryDirectoryValue.Builder fetchInternal( function, /*args=*/ ImmutableList.of(starlarkRepositoryContext), /*kwargs=*/ ImmutableMap.of()); + fetchSuccessful = true; + } finally { + if (starlarkRepositoryContext.ensureNoPendingAsyncTasks( + env.getListener(), fetchSuccessful)) { + if (fetchSuccessful) { + throw new RepositoryFunctionException( + new EvalException( + "Pending asynchronous work after repository rule finished running"), + Transience.PERSISTENT); + } + } } + RepositoryResolvedEvent resolved = new RepositoryResolvedEvent( rule, starlarkRepositoryContext.getAttr(), outputDirectory, result); diff --git a/src/test/shell/bazel/external_integration_test.sh b/src/test/shell/bazel/external_integration_test.sh index 83223138173ed7..e378a7074ccc4b 100755 --- a/src/test/shell/bazel/external_integration_test.sh +++ b/src/test/shell/bazel/external_integration_test.sh @@ -49,6 +49,9 @@ EOF tear_down() { shutdown_server + if [ -d "${TEST_TMPDIR}/server_dir" ]; then + rm -fr "${TEST_TMPDIR}/server_dir" + fi } function zip_up() { @@ -410,6 +413,188 @@ EOF expect_log "404 Not Found" } +function test_deferred_download_unwaited() { + cat >> $(create_workspace_with_default_repos WORKSPACE) <<'EOF' +load("hang.bzl", "hang") + +hang(name="hang") +EOF + + cat > hang.bzl <<'EOF' +def _hang_impl(rctx): + hangs = rctx.download( + # This URL will definitely not work, but that's OK -- we don't need a + # successful request for this test + url = "https://127.0.0.1:0/does_not_exist", + output = "does_not_exist", + block = False) + +hang = repository_rule(implementation = _hang_impl) +EOF + + touch BUILD + bazel query @hang//:all >& $TEST_log && fail "Bazel unexpectedly succeeded" + expect_log "Pending asynchronous work" +} + +function test_deferred_download_two_parallel_downloads() { + local server_dir="${TEST_TMPDIR}/server_dir" + local gate_socket="${server_dir}/gate_socket" + local served_apple="APPLE" + local served_banana="BANANA" + local apple_sha256=$(echo "${served_apple}" | sha256sum | cut -f1 -d' ') + local banana_sha256=$(echo "${served_banana}" | sha256sum | cut -f1 -d' ') + + mkdir -p "${server_dir}" + + mkfifo "${server_dir}/apple" || fail "cannot mkfifo" + mkfifo "${server_dir}/banana" || fail "cannot mkfifo" + mkfifo $gate_socket || fail "cannot mkfifo" + + startup_server "${server_dir}" + + cat >> $(create_workspace_with_default_repos WORKSPACE) <<'EOF' +load("defer.bzl", "defer") + +defer(name="defer") +EOF + + cat > defer.bzl < ${server_dir}/gate_socket"]) + + # Wait until the requess are done + [p.wait() for p in pending] + + rctx.file("WORKSPACE", "") + rctx.file("BUILD", "filegroup(name='f', srcs=glob(['**']))") + +defer = repository_rule(implementation = _defer_impl) +EOF + + touch BUILD + + # Start Bazel + bazel query @defer//:all >& $TEST_log & + local bazel_pid=$! + + # Wait until the .download() calls return + cat "${server_dir}/gate_socket" + + # Tell the test server the strings it should serve. In parallel because the + # test server apparently cannot serve two HTTP requests in parallel, so if we + # wait for request A to be completely served while unblocking request B, it is + # possible that the test server wants to serve request B first, which is a + # deadlock. + echo "${served_apple}" > "${server_dir}/apple" & + local apple_pid=$! + echo "${served_banana}" > "${server_dir}/banana" & + local banana_pid=$! + wait $apple_pid + wait $banana_pid + + # Wait until Bazel returns + wait "${bazel_pid}" || fail "Bazel failed" + expect_log "@defer//:f" +} + +function test_deferred_download_error() { + cat >> $(create_workspace_with_default_repos WORKSPACE) <<'EOF' +load("defer.bzl", "defer") + +defer(name="defer") +EOF + + cat > defer.bzl <& $TEST_log && fail "Bazel unexpectedly succeeded" + expect_log "Error downloading.*doesnotexist" + expect_not_log "survived wait" +} + +function test_deferred_download_smoke() { + local server_dir="${TEST_TMPDIR}/server_dir" + local served_socket="${server_dir}/served_socket" + local gate_socket="${server_dir}/gate_socket" + local served_string="DEFERRED" + local served_sha256=$(echo "${served_string}" | sha256sum | cut -f1 -d' ') + + mkdir -p "${server_dir}" + + mkfifo $served_socket || fail "cannot mkfifo" + mkfifo $gate_socket || fail "cannot mkfifo" + + startup_server "${server_dir}" + + cat >> $(create_workspace_with_default_repos WORKSPACE) <<'EOF' +load("defer.bzl", "defer") + +defer(name="defer") +EOF + + cat > defer.bzl < ${server_dir}/gate_socket"]) + deferred.wait() + rctx.file("WORKSPACE", "") + rctx.file("BUILD", "filegroup(name='f', srcs=glob(['**']))") + +defer = repository_rule(implementation = _defer_impl) +EOF + + touch BUILD + + # Start Bazel + bazel query @defer//:all-targets >& $TEST_log & + local bazel_pid=$! + + # Wait until the .download() call returns + cat "${server_dir}/gate_socket" + + # Tell the test server the string it should serve + echo "${served_string}" > "${server_dir}/served_socket" + + # Wait until Bazel returns + wait "${bazel_pid}" || fail "Bazel failed" + expect_log "@defer//:deferred" +} + # Tests downloading a file and using it as a dependency. function test_http_download() { local test_file=$TEST_TMPDIR/toto