Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote: Fixes a confusion that background upload counter could increase after build finished. #13954

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
Expand Down Expand Up @@ -66,7 +62,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
* A cache for storing artifacts (input and output) as well as the output of running an action.
Expand All @@ -85,7 +80,6 @@ public class RemoteCache extends AbstractReferenceCounted {
private static final ListenableFuture<Void> COMPLETED_SUCCESS = immediateFuture(null);
private static final ListenableFuture<byte[]> EMPTY_BYTES = immediateFuture(new byte[0]);

private final ExtendedEventHandler reporter;
private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();

Expand All @@ -94,11 +88,9 @@ public class RemoteCache extends AbstractReferenceCounted {
protected final DigestUtil digestUtil;

public RemoteCache(
ExtendedEventHandler reporter,
RemoteCacheClient cacheProtocol,
RemoteOptions options,
DigestUtil digestUtil) {
this.reporter = reporter;
this.cacheProtocol = cacheProtocol;
this.options = options;
this.digestUtil = digestUtil;
Expand All @@ -110,23 +102,6 @@ public CachedActionResult downloadActionResult(
return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr));
}

private void postUploadStartedEvent(@Nullable ActionExecutionMetadata action, String resourceId) {
if (action == null) {
return;
}

reporter.post(ActionUploadStartedEvent.create(action, resourceId));
}

private void postUploadFinishedEvent(
@Nullable ActionExecutionMetadata action, String resourceId) {
if (action == null) {
return;
}

reporter.post(ActionUploadFinishedEvent.create(action, resourceId));
}

/**
* Returns a set of digests that the remote cache does not know about. The returned set is
* guaranteed to be a subset of {@code digests}.
Expand All @@ -143,38 +118,14 @@ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {

ActionExecutionMetadata action = context.getSpawnOwner();

Completable upload =
Completable.using(
() -> {
String resourceId = "ac/" + actionKey.getDigest().getHash();
postUploadStartedEvent(action, resourceId);
return resourceId;
},
resourceId ->
RxFutures.toCompletable(
() -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
directExecutor()),
resourceId -> postUploadFinishedEvent(action, resourceId));
RxFutures.toCompletable(
() -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
directExecutor());

return RxFutures.toListenableFuture(upload);
}

private Completable doUploadFile(RemoteActionExecutionContext context, Digest digest, Path file) {
ActionExecutionMetadata action = context.getSpawnOwner();
return Completable.using(
() -> {
String resourceId = "cas/" + digest.getHash();
postUploadStartedEvent(action, resourceId);
return resourceId;
},
resourceId ->
RxFutures.toCompletable(
() -> cacheProtocol.uploadFile(context, digest, file), directExecutor()),
resourceId -> postUploadFinishedEvent(action, resourceId));
}

/**
* Upload a local file to the remote cache.
*
Expand All @@ -191,26 +142,15 @@ public final ListenableFuture<Void> uploadFile(
return COMPLETED_SUCCESS;
}

Completable upload = casUploadCache.executeIfNot(digest, doUploadFile(context, digest, file));
Completable upload =
casUploadCache.executeIfNot(
digest,
RxFutures.toCompletable(
() -> cacheProtocol.uploadFile(context, digest, file), directExecutor()));

return RxFutures.toListenableFuture(upload);
}

private Completable doUploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
ActionExecutionMetadata action = context.getSpawnOwner();
return Completable.using(
() -> {
String resourceId = "cas/" + digest.getHash();
postUploadStartedEvent(action, resourceId);
return resourceId;
},
resourceId ->
RxFutures.toCompletable(
() -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()),
resourceId -> postUploadFinishedEvent(action, resourceId));
}

/**
* Upload sequence of bytes to the remote cache.
*
Expand All @@ -227,7 +167,11 @@ public final ListenableFuture<Void> uploadBlob(
return COMPLETED_SUCCESS;
}

Completable upload = casUploadCache.executeIfNot(digest, doUploadBlob(context, digest, data));
Completable upload =
casUploadCache.executeIfNot(
digest,
RxFutures.toCompletable(
() -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()));

return RxFutures.toListenableFuture(upload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -43,11 +42,10 @@
public class RemoteExecutionCache extends RemoteCache {

public RemoteExecutionCache(
ExtendedEventHandler reporter,
RemoteCacheClient protocolImpl,
RemoteOptions options,
DigestUtil digestUtil) {
super(reporter, protocolImpl, options, digestUtil);
super(protocolImpl, options, digestUtil);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.DirectoryMetadata;
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata;
Expand Down Expand Up @@ -1070,7 +1071,7 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
Single.using(
remoteCache::retain,
remoteCache ->
manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache),
manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache, reporter),
RemoteCache::release)
.subscribeOn(scheduler)
.subscribe(
Expand All @@ -1087,7 +1088,10 @@ public void onError(@NonNull Throwable e) {
}
});
} else {
manifest.upload(action.getRemoteActionExecutionContext(), remoteCache);
try (SilentCloseable c =
Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
manifest.upload(action.getRemoteActionExecutionContext(), remoteCache, reporter);
}
}
} catch (IOException e) {
reportUploadError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private void initHttpAndDiskCache(
return;
}
RemoteCache remoteCache =
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
Expand Down Expand Up @@ -576,7 +576,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}
execChannel.release();
RemoteExecutionCache remoteCache =
new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
new RemoteExecutionCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteExecution(
executorService,
Expand Down Expand Up @@ -613,7 +613,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

RemoteCache remoteCache =
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
new RemoteCache(cacheClient, remoteOptions, digestUtil);
actionContextProvider =
RemoteActionContextProvider.createForRemoteCaching(
executorService, env, remoteCache, retryScheduler, digestUtil);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ public void store(SpawnResult result) throws ExecException, InterruptedException
}
}

try (SilentCloseable c = prof.profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
remoteExecutionService.uploadOutputs(action, result);
}
remoteExecutionService.uploadOutputs(action, result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,7 @@ SpawnResult execLocallyAndUpload(
}
}

try (SilentCloseable c = Profiler.instance().profile(UPLOAD_TIME, "upload outputs")) {
remoteExecutionService.uploadOutputs(action, result);
}
remoteExecutionService.uploadOutputs(action, result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@
import build.bazel.remote.execution.v2.Tree;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.common.RemotePathResolver;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
Expand All @@ -56,6 +62,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/** UploadManifest adds output metadata to a {@link ActionResult}. */
Expand Down Expand Up @@ -340,10 +347,11 @@ ActionResult getActionResult() {
}

/** Uploads outputs and action result (if exit code is 0) to remote cache. */
public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
public ActionResult upload(
RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter)
throws IOException, InterruptedException {
try {
return uploadAsync(context, remoteCache).blockingGet();
return uploadAsync(context, remoteCache, reporter).blockingGet();
} catch (RuntimeException e) {
throwIfInstanceOf(e.getCause(), InterruptedException.class);
throwIfInstanceOf(e.getCause(), IOException.class);
Expand All @@ -367,29 +375,91 @@ private Completable upload(
return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
}

private static void reportUploadStarted(
ExtendedEventHandler reporter,
@Nullable ActionExecutionMetadata action,
String prefix,
Iterable<Digest> digests) {
if (action != null) {
for (Digest digest : digests) {
reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash()));
}
}
}

private static void reportUploadFinished(
ExtendedEventHandler reporter,
@Nullable ActionExecutionMetadata action,
String resourceIdPrefix,
Iterable<Digest> digests) {
if (action != null) {
for (Digest digest : digests) {
reporter.post(
ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash()));
}
}
}

/**
* Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
* code is 0) to remote cache.
*/
public Single<ActionResult> uploadAsync(
RemoteActionExecutionContext context, RemoteCache remoteCache) {
RemoteActionExecutionContext context,
RemoteCache remoteCache,
ExtendedEventHandler reporter) {
Collection<Digest> digests = new ArrayList<>();
digests.addAll(digestToFile.keySet());
digests.addAll(digestToBlobs.keySet());

Completable uploadOutputs =
mergeBulkTransfer(
toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
.flatMapPublisher(Flowable::fromIterable)
.flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));
ActionExecutionMetadata action = context.getSpawnOwner();

String outputPrefix = "cas/";
Flowable<RxUtils.TransferResult> bulkTransfers =
toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
.doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests))
.doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests))
.doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests))
.doOnSuccess(
missingDigests -> {
List<Digest> existedDigests =
digests.stream()
.filter(digest -> !missingDigests.contains(digest))
.collect(Collectors.toList());
reportUploadFinished(reporter, action, outputPrefix, existedDigests);
})
.flatMapPublisher(Flowable::fromIterable)
.flatMapSingle(
digest ->
toTransferResult(upload(context, remoteCache, digest))
.doFinally(
() ->
reportUploadFinished(
reporter, action, outputPrefix, ImmutableList.of(digest))));
Completable uploadOutputs = mergeBulkTransfer(bulkTransfers);

ActionResult actionResult = result.build();
Completable uploadActionResult = Completable.complete();
if (actionResult.getExitCode() == 0 && actionKey != null) {
String actionResultPrefix = "ac/";
uploadActionResult =
toCompletable(
() -> remoteCache.uploadActionResult(context, actionKey, actionResult),
directExecutor());
() -> remoteCache.uploadActionResult(context, actionKey, actionResult),
directExecutor())
.doOnSubscribe(
d ->
reportUploadStarted(
reporter,
action,
actionResultPrefix,
ImmutableList.of(actionKey.getDigest())))
.doFinally(
() ->
reportUploadFinished(
reporter,
action,
actionResultPrefix,
ImmutableList.of(actionKey.getDigest())));
}

return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);
Expand Down
Loading