Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.1] Remote: Fix crashes by InterruptedException when dynamic execution is… #15091

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,39 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.lang.String.format;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
Expand Down Expand Up @@ -72,62 +81,58 @@ public void ensureInputsPresent(
.addAll(additionalInputs.keySet())
.build();

// Collect digests that are not being or already uploaded
ConcurrentHashMap<Digest, AsyncSubject<Boolean>> missingDigestSubjects =
new ConcurrentHashMap<>();

List<ListenableFuture<Void>> uploadFutures = new ArrayList<>();
for (Digest digest : allDigests) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() -> {
// The digest hasn't been processed, add it to the collection which will be used
// later for findMissingDigests call
AsyncSubject<Boolean> missingDigestSubject = AsyncSubject.create();
missingDigestSubjects.put(digest, missingDigestSubject);

return missingDigestSubject.flatMapCompletable(
missing -> {
if (!missing) {
return Completable.complete();
}
return RxFutures.toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
MoreExecutors.directExecutor());
});
}),
force);
uploadFutures.add(RxFutures.toListenableFuture(upload));
if (allDigests.isEmpty()) {
return;
}

ImmutableSet<Digest> missingDigests;
try {
missingDigests = getFromFuture(findMissingDigests(context, missingDigestSubjects.keySet()));
} catch (IOException | InterruptedException e) {
for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) {
entry.getValue().onError(e);
}
MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size());
Flowable<TransferResult> uploads =
Flowable.fromIterable(allDigests)
.flatMapSingle(
digest ->
uploadBlobIfMissing(
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));

if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
try {
mergeBulkTransfer(uploads).blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
Throwables.throwIfInstanceOf(cause, IOException.class);
}
throw e;
}
}

for (Map.Entry<Digest, AsyncSubject<Boolean>> entry : missingDigestSubjects.entrySet()) {
AsyncSubject<Boolean> missingSubject = entry.getValue();
if (missingDigests.contains(entry.getKey())) {
missingSubject.onNext(true);
} else {
// The digest is already existed in the remote cache, skip the upload.
missingSubject.onNext(false);
}
missingSubject.onComplete();
}

waitForBulkTransfer(uploadFutures, /* cancelRemainingOnInterrupt=*/ false);
private Single<TransferResult> uploadBlobIfMissing(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force,
MissingDigestFinder missingDigestFinder,
Digest digest) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() ->
// Only reach here if the digest is missing and is not being uploaded.
missingDigestFinder
.registerAndCount(digest)
.flatMapCompletable(
missingDigests -> {
if (missingDigests.contains(digest)) {
return toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
directExecutor());
} else {
return Completable.complete();
}
})),
/* onIgnored= */ missingDigestFinder::count,
force);
return toTransferResult(upload);
}

private ListenableFuture<Void> uploadBlob(
Expand Down Expand Up @@ -159,4 +164,93 @@ private ListenableFuture<Void> uploadBlob(
"findMissingDigests returned a missing digest that has not been requested: %s",
digest)));
}

/**
* A missing digest finder that initiates the request when the internal counter reaches an
* expected count.
*/
class MissingDigestFinder {
private final int expectedCount;

private final AsyncSubject<ImmutableSet<Digest>> digestsSubject;
private final Single<ImmutableSet<Digest>> resultSingle;

@GuardedBy("this")
private final Set<Digest> digests;

@GuardedBy("this")
private int currentCount = 0;

MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) {
checkArgument(expectedCount > 0, "expectedCount should be greater than 0");
this.expectedCount = expectedCount;
this.digestsSubject = AsyncSubject.create();
this.digests = new HashSet<>();

AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false);
this.resultSingle =
Single.fromObservable(
digestsSubject
.flatMapSingle(
digests -> {
boolean wasCalled = findMissingDigestsCalled.getAndSet(true);
// Make sure we don't have re-subscription caused by refCount() below.
checkState(!wasCalled, "FindMissingDigests is called more than once");
return toSingle(
() -> findMissingDigests(context, digests), directExecutor());
})
// Use replay here because we could have a race condition that downstream hasn't
// been added to the subscription list (to receive the upstream result) while
// upstream is completed.
.replay(1)
.refCount());
}

/**
* Register the {@code digest} and increase the counter.
*
* <p>Returned Single cannot be subscribed more than once.
*
* @return Single that emits the result of the {@code FindMissingDigest} request.
*/
Single<ImmutableSet<Digest>> registerAndCount(Digest digest) {
AtomicBoolean subscribed = new AtomicBoolean(false);
// count() will potentially trigger the findMissingDigests call. Adding and counting before
// returning the Single could introduce a race that the result of findMissingDigests is
// available but the consumer doesn't get it because it hasn't subscribed the returned
// Single. In this case, it subscribes after upstream is completed resulting a re-run of
// findMissingDigests (due to refCount()).
//
// Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
// returned Single to avoid a re-execution of findMissingDigests.
return resultSingle.doOnSubscribe(
d -> {
boolean wasSubscribed = subscribed.getAndSet(true);
checkState(!wasSubscribed, "Single is subscribed more than once");
synchronized (this) {
digests.add(digest);
}
count();
});
}

/** Increase the counter. */
void count() {
ImmutableSet<Digest> digestsResult = null;

synchronized (this) {
if (currentCount < expectedCount) {
currentCount++;
if (currentCount == expectedCount) {
digestsResult = ImmutableSet.copyOf(digests);
}
}
}

if (digestsResult != null) {
digestsSubject.onNext(digestsResult);
digestsSubject.onComplete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -256,14 +257,25 @@ public boolean isDisposed() {
/**
* Executes a task.
*
* @see #execute(Object, Single, Action, boolean).
*/
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
return execute(key, task, () -> {}, force);
}

/**
* Executes a task. If the task has already finished, this execution of the task is ignored unless
* `force` is true. If the task is in progress this execution of the task is always ignored.
*
* <p>If the cache is already shutdown, a {@link CancellationException} will be emitted.
*
* @param key identifies the task.
* @param onIgnored callback called when provided task is ignored.
* @param force re-execute a finished task if set to {@code true}.
* @return a {@link Single} which turns to completed once the task is finished or propagates the
* error if any.
*/
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
public Single<ValueT> execute(KeyT key, Single<ValueT> task, Action onIgnored, boolean force) {
return Single.create(
emitter -> {
synchronized (lock) {
Expand All @@ -273,14 +285,20 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
}

if (!force && finished.containsKey(key)) {
onIgnored.run();
emitter.onSuccess(finished.get(key));
return;
}

finished.remove(key);

Execution execution =
inProgress.computeIfAbsent(key, ignoredKey -> new Execution(key, task));
Execution execution = inProgress.get(key);
if (execution != null) {
onIgnored.run();
} else {
execution = new Execution(key, task);
inProgress.put(key, execution);
}

// We must subscribe the execution within the scope of lock to avoid race condition
// that:
Expand Down Expand Up @@ -425,10 +443,15 @@ public Completable executeIfNot(KeyT key, Completable task) {
cache.executeIfNot(key, task.toSingleDefault(Optional.empty())));
}

/** Same as {@link AsyncTaskCache#executeIfNot} but operates on {@link Completable}. */
/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
public Completable execute(KeyT key, Completable task, boolean force) {
return execute(key, task, () -> {}, force);
}

/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
public Completable execute(KeyT key, Completable task, Action onIgnored, boolean force) {
return Completable.fromSingle(
cache.execute(key, task.toSingleDefault(Optional.empty()), force));
cache.execute(key, task.toSingleDefault(Optional.empty()), onIgnored, force));
}

/** Returns a set of keys for tasks which is finished. */
Expand Down
Loading