diff --git a/nix/bazel-retry-cache.patch b/nix/bazel-retry-cache.patch index 7f97692f5db2..ec6c44d6d6de 100644 --- a/nix/bazel-retry-cache.patch +++ b/nix/bazel-retry-cache.patch @@ -67,10 +67,30 @@ index 57741a8f28..6673149a20 100644 workingDirectory, diskCachePath, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java -index 350e1afa51..cf5e97c3b5 100644 +index 350e1afa51..937dddd1b9 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java -@@ -213,7 +213,16 @@ public final class RemoteModule extends BlazeModule { +@@ -59,9 +59,11 @@ import com.google.devtools.build.lib.exec.ModuleActionContextRegistry; + import com.google.devtools.build.lib.exec.SpawnStrategyRegistry; + import com.google.devtools.build.lib.packages.TargetUtils; + import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; ++import com.google.devtools.build.lib.remote.common.CacheNotFoundException; + import com.google.devtools.build.lib.remote.common.RemoteCacheClient; + import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; + import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; ++import com.google.devtools.build.lib.remote.http.HttpException; + import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; + import com.google.devtools.build.lib.remote.options.RemoteOptions; + import com.google.devtools.build.lib.remote.options.RemoteOutputsMode; +@@ -95,6 +97,7 @@ import io.grpc.ClientInterceptor; + import io.grpc.ManagedChannel; + import io.reactivex.rxjava3.plugins.RxJavaPlugins; + import java.io.IOException; ++import java.nio.channels.ClosedChannelException; + import java.util.HashSet; + import java.util.List; + import java.util.Map; +@@ -213,7 +216,26 @@ public final class RemoteModule extends BlazeModule { remoteOptions, creds, Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"), @@ -79,8 +99,18 @@ index 350e1afa51..cf5e97c3b5 100644 + new RemoteRetrier( + remoteOptions, + (e) -> { ++ boolean retry = false; ++ if (e instanceof ClosedChannelException) { ++ retry = true; ++ } else if (e instanceof HttpException) { ++ retry = true; ++ } ++ if (retry) { + System.err.println("RETRYING: " + e.toString()); -+ return true; ++ } else if (!(e instanceof CacheNotFoundException)) { ++ System.err.println("NOT RETRYING: " + e.toString()); ++ } ++ return retry; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS) @@ -100,8 +130,39 @@ index 9ce71c7c52..5c7f2d0728 100644 "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:auth", +diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java +index a2e4abf9d8..93843a91dc 100644 +--- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java ++++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java +@@ -25,12 +25,18 @@ final class DownloadCommand { + private final boolean casDownload; + private final Digest digest; + private final OutputStream out; ++ private final long offset; + +- DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { ++ DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) { + this.uri = Preconditions.checkNotNull(uri); + this.casDownload = casDownload; + this.digest = Preconditions.checkNotNull(digest); + this.out = Preconditions.checkNotNull(out); ++ this.offset = offset; ++ } ++ ++ DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { ++ this(uri, casDownload, digest, out, 0); + } + + public URI uri() { +@@ -48,4 +54,6 @@ final class DownloadCommand { + public OutputStream out() { + return out; + } ++ ++ public long offset() { return offset; } + } diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java -index 1efecd3bb1..16084e4a43 100644 +index 1efecd3bb1..a2366ea596 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Futures; @@ -112,7 +173,15 @@ index 1efecd3bb1..16084e4a43 100644 import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -@@ -132,6 +133,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -84,6 +85,7 @@ import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.concurrent.atomic.AtomicLong; + import java.util.function.Function; + import java.util.regex.Pattern; + import javax.annotation.Nullable; +@@ -132,6 +134,7 @@ public final class HttpCacheClient implements RemoteCacheClient { private final boolean useTls; private final boolean verifyDownloads; private final DigestUtil digestUtil; @@ -120,7 +189,7 @@ index 1efecd3bb1..16084e4a43 100644 private final Object closeLock = new Object(); -@@ -153,6 +155,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -153,6 +156,7 @@ public final class HttpCacheClient implements RemoteCacheClient { boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, @@ -128,7 +197,7 @@ index 1efecd3bb1..16084e4a43 100644 @Nullable final Credentials creds) throws Exception { return new HttpCacheClient( -@@ -164,6 +167,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -164,6 +168,7 @@ public final class HttpCacheClient implements RemoteCacheClient { verifyDownloads, extraHttpHeaders, digestUtil, @@ -136,7 +205,7 @@ index 1efecd3bb1..16084e4a43 100644 creds, null); } -@@ -176,6 +180,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -176,6 +181,7 @@ public final class HttpCacheClient implements RemoteCacheClient { boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, @@ -144,7 +213,7 @@ index 1efecd3bb1..16084e4a43 100644 @Nullable final Credentials creds) throws Exception { -@@ -189,6 +194,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -189,6 +195,7 @@ public final class HttpCacheClient implements RemoteCacheClient { verifyDownloads, extraHttpHeaders, digestUtil, @@ -152,7 +221,7 @@ index 1efecd3bb1..16084e4a43 100644 creds, domainSocketAddress); } else if (Epoll.isAvailable()) { -@@ -201,6 +207,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -201,6 +208,7 @@ public final class HttpCacheClient implements RemoteCacheClient { verifyDownloads, extraHttpHeaders, digestUtil, @@ -160,7 +229,7 @@ index 1efecd3bb1..16084e4a43 100644 creds, domainSocketAddress); } else { -@@ -217,6 +224,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -217,6 +225,7 @@ public final class HttpCacheClient implements RemoteCacheClient { boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, @@ -168,7 +237,7 @@ index 1efecd3bb1..16084e4a43 100644 @Nullable final Credentials creds, @Nullable SocketAddress socketAddress) throws Exception { -@@ -285,6 +293,7 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -285,6 +294,7 @@ public final class HttpCacheClient implements RemoteCacheClient { this.extraHttpHeaders = extraHttpHeaders; this.verifyDownloads = verifyDownloads; this.digestUtil = digestUtil; @@ -176,73 +245,157 @@ index 1efecd3bb1..16084e4a43 100644 } @SuppressWarnings("FutureReturnValueIgnored") -@@ -440,22 +449,24 @@ public final class HttpCacheClient implements RemoteCacheClient { - @Override - public ListenableFuture downloadBlob( - RemoteActionExecutionContext context, Digest digest, OutputStream out) { -- final DigestOutputStream digestOut = -- verifyDownloads ? digestUtil.newDigestOutputStream(out) : null; -- return Futures.transformAsync( -- get(digest, digestOut != null ? digestOut : out, /* casDownload= */ true), -- (v) -> { -- try { -- if (digestOut != null) { -- Utils.verifyBlobContents(digest, digestOut.digest()); -- } -- out.flush(); -- return Futures.immediateFuture(null); -- } catch (IOException e) { -- return Futures.immediateFailedFuture(e); -- } -- }, -- MoreExecutors.directExecutor()); +@@ -461,6 +471,7 @@ public final class HttpCacheClient implements RemoteCacheClient { + @SuppressWarnings("FutureReturnValueIgnored") + private ListenableFuture get(Digest digest, final OutputStream out, boolean casDownload) { + final AtomicBoolean dataWritten = new AtomicBoolean(); ++ AtomicLong bytesDownloaded = new AtomicLong(); + OutputStream wrappedOut = + new OutputStream() { + // OutputStream.close() does nothing, which is what we want to ensure that the +@@ -470,12 +481,14 @@ public final class HttpCacheClient implements RemoteCacheClient { + @Override + public void write(byte[] b, int offset, int length) throws IOException { + dataWritten.set(true); ++ bytesDownloaded.addAndGet(length); + out.write(b, offset, length); + } + + @Override + public void write(int b) throws IOException { + dataWritten.set(true); ++ bytesDownloaded.incrementAndGet(); + out.write(b); + } + +@@ -484,57 +497,59 @@ public final class HttpCacheClient implements RemoteCacheClient { + out.flush(); + } + }; +- DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut); +- SettableFuture outerF = SettableFuture.create(); +- acquireDownloadChannel() +- .addListener( +- (Future channelPromise) -> { +- if (!channelPromise.isSuccess()) { +- outerF.setException(channelPromise.cause()); +- return; +- } +- +- Channel ch = channelPromise.getNow(); +- ch.writeAndFlush(downloadCmd) +- .addListener( +- (f) -> { +- try { +- if (f.isSuccess()) { +- outerF.set(null); +- } else { +- Throwable cause = f.cause(); +- // cause can be of type HttpException, because Netty uses +- // Unsafe.throwException to +- // re-throw a checked exception that hasn't been declared in the method +- // signature. +- if (cause instanceof HttpException) { +- HttpResponse response = ((HttpException) cause).response(); +- if (!dataWritten.get() && authTokenExpired(response)) { +- // The error is due to an auth token having expired. Let's try +- // again. +- try { +- refreshCredentials(); +- getAfterCredentialRefresh(downloadCmd, outerF); +- return; +- } catch (IOException e) { +- cause.addSuppressed(e); +- } catch (RuntimeException e) { +- logger.atWarning().withCause(e).log("Unexpected exception"); +- cause.addSuppressed(e); +- } +- } else if (cacheMiss(response.status())) { +- outerF.setException(new CacheNotFoundException(digest)); +- return; +- } +- } +- outerF.setException(cause); +- } +- } finally { +- releaseDownloadChannel(ch); + return retrier.executeAsync(() -> { -+ final DigestOutputStream digestOut = -+ verifyDownloads ? digestUtil.newDigestOutputStream(out) : null; -+ return Futures.transformAsync( -+ get(digest, digestOut != null ? digestOut : out, /* casDownload= */ true), -+ (v) -> { -+ try { -+ if (digestOut != null) { -+ Utils.verifyBlobContents(digest, digestOut.digest()); -+ } -+ out.flush(); -+ return Futures.immediateFuture(null); -+ } catch (IOException e) { -+ return Futures.immediateFailedFuture(e); -+ } -+ }, -+ MoreExecutors.directExecutor()); ++ DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut, bytesDownloaded.get()); ++ SettableFuture outerF = SettableFuture.create(); ++ acquireDownloadChannel() ++ .addListener( ++ (Future channelPromise) -> { ++ if (!channelPromise.isSuccess()) { ++ outerF.setException(channelPromise.cause()); ++ return; + } ++ ++ Channel ch = channelPromise.getNow(); ++ ch.writeAndFlush(downloadCmd) ++ .addListener( ++ (f) -> { ++ try { ++ if (f.isSuccess()) { ++ outerF.set(null); ++ } else { ++ Throwable cause = f.cause(); ++ // cause can be of type HttpException, because Netty uses ++ // Unsafe.throwException to ++ // re-throw a checked exception that hasn't been declared in the method ++ // signature. ++ if (cause instanceof HttpException) { ++ HttpResponse response = ((HttpException) cause).response(); ++ if (!dataWritten.get() && authTokenExpired(response)) { ++ // The error is due to an auth token having expired. Let's try ++ // again. ++ try { ++ refreshCredentials(); ++ getAfterCredentialRefresh(downloadCmd, outerF); ++ return; ++ } catch (IOException e) { ++ cause.addSuppressed(e); ++ } catch (RuntimeException e) { ++ logger.atWarning().withCause(e).log("Unexpected exception"); ++ cause.addSuppressed(e); ++ } ++ } else if (cacheMiss(response.status())) { ++ outerF.setException(new CacheNotFoundException(digest)); ++ return; ++ } ++ } ++ outerF.setException(cause); ++ } ++ } finally { ++ releaseDownloadChannel(ch); ++ } ++ }); + }); +- }); +- return outerF; ++ return outerF; + }); } @SuppressWarnings("FutureReturnValueIgnored") -@@ -575,8 +586,8 @@ public final class HttpCacheClient implements RemoteCacheClient { - @Override - public ListenableFuture downloadActionResult( - RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) { -- return Utils.downloadAsActionResult( -- actionKey, (digest, out) -> get(digest, out, /* casDownload= */ false)); -+ return retrier.executeAsync(() -> Utils.downloadAsActionResult( -+ actionKey, (digest, out) -> get(digest, out, /* casDownload= */ false))); - } - - @SuppressWarnings("FutureReturnValueIgnored") -@@ -673,20 +684,22 @@ public final class HttpCacheClient implements RemoteCacheClient { +@@ -673,20 +688,21 @@ public final class HttpCacheClient implements RemoteCacheClient { @Override public ListenableFuture uploadFile( RemoteActionExecutionContext context, Digest digest, Path file) { -+ InputStream in; - try { +- try { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); -+ in = file.getInputStream(); - } catch (IOException e) { - // Can be thrown from file.getInputStream. - return Futures.immediateFailedFuture(e); - } -+ return retrier.executeAsync(() -> uploadAsync( -+ digest.getHash(), digest.getSizeBytes(), in, /* casUpload= */ true)); +- } catch (IOException e) { +- // Can be thrown from file.getInputStream. +- return Futures.immediateFailedFuture(e); +- } ++ return retrier.executeAsync(() -> { ++ try { ++ return uploadAsync(digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); ++ } catch (IOException e) { ++ // Can be thrown from file.getInputStream. ++ return Futures.immediateFailedFuture(e); ++ } ++ }); } @Override @@ -255,3 +408,154 @@ index 1efecd3bb1..16084e4a43 100644 } @Override +diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +index 50d83d138a..f38dad965f 100644 +--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java ++++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +@@ -20,24 +20,18 @@ import com.google.common.collect.ImmutableList; + import io.netty.buffer.ByteBuf; + import io.netty.channel.ChannelHandlerContext; + import io.netty.channel.ChannelPromise; +-import io.netty.handler.codec.http.DefaultFullHttpRequest; +-import io.netty.handler.codec.http.HttpContent; +-import io.netty.handler.codec.http.HttpHeaderNames; +-import io.netty.handler.codec.http.HttpHeaderValues; +-import io.netty.handler.codec.http.HttpMethod; +-import io.netty.handler.codec.http.HttpObject; +-import io.netty.handler.codec.http.HttpRequest; +-import io.netty.handler.codec.http.HttpResponse; +-import io.netty.handler.codec.http.HttpResponseStatus; +-import io.netty.handler.codec.http.HttpUtil; +-import io.netty.handler.codec.http.HttpVersion; +-import io.netty.handler.codec.http.LastHttpContent; ++import io.netty.handler.codec.http.*; + import io.netty.handler.timeout.ReadTimeoutException; + import io.netty.util.internal.StringUtil; ++ + import java.io.ByteArrayOutputStream; + import java.io.IOException; + import java.io.OutputStream; + import java.util.Map.Entry; ++import java.util.Optional; ++import java.util.OptionalInt; ++import java.util.regex.Matcher; ++import java.util.regex.Pattern; + + /** ChannelHandler for downloads. */ + final class HttpDownloadHandler extends AbstractHttpHandler { +@@ -51,6 +45,10 @@ final class HttpDownloadHandler extends AbstractHttpHandler { + private long contentLength = -1; + /** the path header in the http request */ + private String path; ++ /** the offset at which to download */ ++ private long offset; ++ /** the bytes to skip in a full or chunked response */ ++ private OptionalInt skipBytes; + + public HttpDownloadHandler( + Credentials credentials, ImmutableList> extraHttpHeaders) { +@@ -93,7 +91,25 @@ final class HttpDownloadHandler extends AbstractHttpHandler { + if (contentLengthSet) { + contentLength = HttpUtil.getContentLength(response); + } +- downloadSucceeded = response.status().equals(HttpResponseStatus.OK); ++ boolean full_content = response.status().equals(HttpResponseStatus.OK); ++ boolean partial_content = response.status().equals(HttpResponseStatus.PARTIAL_CONTENT); ++ if (full_content) { ++ if (offset != 0) { ++ // We requested a range but the server replied with a full response. ++ // We need to skip `offset` bytes of the response. ++ if (!skipBytes.isPresent()) { ++ // This is the first chunk, or the full response. ++ skipBytes = OptionalInt.of((int)offset); ++ } ++ } ++ } else if (partial_content) { ++ Optional error = validateContentRangeHeader(response.headers()); ++ if (error.isPresent()) { ++ failAndClose(error.get(), ctx); ++ return; ++ } ++ } ++ downloadSucceeded = full_content || partial_content; + if (!downloadSucceeded) { + out = new ByteArrayOutputStream(); + } +@@ -105,6 +121,15 @@ final class HttpDownloadHandler extends AbstractHttpHandler { + + ByteBuf content = ((HttpContent) msg).content(); + int readableBytes = content.readableBytes(); ++ if (skipBytes.isPresent() && skipBytes.getAsInt() > 0) { ++ int skipNow = skipBytes.getAsInt(); ++ if (skipNow >= readableBytes) { ++ skipNow = readableBytes; ++ } ++ content.readerIndex(content.readerIndex() + skipNow); ++ skipBytes = OptionalInt.of(skipBytes.getAsInt() - skipNow); ++ readableBytes = readableBytes - skipNow; ++ } + content.readBytes(out, readableBytes); + bytesReceived += readableBytes; + if (msg instanceof LastHttpContent) { +@@ -137,7 +162,9 @@ final class HttpDownloadHandler extends AbstractHttpHandler { + DownloadCommand cmd = (DownloadCommand) msg; + out = cmd.out(); + path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); +- HttpRequest request = buildRequest(path, constructHost(cmd.uri())); ++ offset = cmd.offset(); ++ skipBytes = OptionalInt.empty(); ++ HttpRequest request = buildRequest(path, constructHost(cmd.uri()), cmd.offset()); + addCredentialHeaders(request, cmd.uri()); + addExtraRemoteHeaders(request); + addUserAgentHeader(request); +@@ -159,16 +186,36 @@ final class HttpDownloadHandler extends AbstractHttpHandler { + } + } + +- private HttpRequest buildRequest(String path, String host) { ++ private HttpRequest buildRequest(String path, String host, long offset) { + HttpRequest httpRequest = + new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); + httpRequest.headers().set(HttpHeaderNames.HOST, host); + httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*"); + httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); ++ if (offset != 0) { ++ httpRequest.headers().set(HttpHeaderNames.RANGE, String.format("%s=%d-", HttpHeaderValues.BYTES, offset)); ++ } + return httpRequest; + } + ++ private Optional validateContentRangeHeader(HttpHeaders headers) { ++ if (!headers.contains(HttpHeaderNames.CONTENT_RANGE)) { ++ return Optional.of(new HttpException(response, "Missing 'Content-Range' header", null)); ++ } ++ Pattern pattern = Pattern.compile("bytes\\s+(?[0-9]+)-(?[0-9]+)/(?[0-9]*|\\*)"); ++ Matcher matcher = pattern.matcher(response.headers().get(HttpHeaderNames.CONTENT_RANGE)); ++ if (!matcher.matches()) { ++ return Optional.of(new HttpException(response, "Unexpected 'Content-Range' header", null)); ++ } ++ long start = Long.valueOf(matcher.group("start")); ++ if (start != offset) { ++ return Optional.of(new HttpException( ++ response, String.format("Unexpected 'Content-Range' start: Expected %d but got %d", offset, start), null)); ++ } ++ return Optional.empty(); ++ } ++ + private void succeedAndReset(ChannelHandlerContext ctx) { + // All resets must happen *before* completing the user promise. Otherwise there is a race + // condition, where this handler can be reused even though it is closed. In addition, if reset +diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java +index 89fde56046..6a2bfd5a50 100644 +--- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java ++++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java +@@ -18,7 +18,7 @@ import io.netty.handler.codec.http.HttpResponse; + import java.io.IOException; + + /** An exception that propagates the http status. */ +-final class HttpException extends IOException { ++public final class HttpException extends IOException { + private final HttpResponse response; + + HttpException(HttpResponse response, String message, Throwable cause) {