diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java index 65e17c29c9665e..e235d6f02707af 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java @@ -63,6 +63,7 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.pkgcache.TargetParsingCompleteEvent; +import com.google.devtools.build.lib.runtime.CountingArtifactGroupNamer.LatchedGroupName; import com.google.devtools.build.lib.util.Pair; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.ArrayList; @@ -370,27 +371,27 @@ private synchronized void close(@Nullable AbortReason reason) { } private void maybeReportArtifactSet(CompletionContext ctx, NestedSet set) { - String name = artifactGroupNamer.maybeName(set); - if (name == null) { - return; - } - - set = NamedArtifactGroup.expandSet(ctx, set); - // Invariant: all leaf successors ("direct elements") of set are ExpandedArtifacts. - - // We only split if the max number of entries is at least 2 (it must be at least a binary tree). - // The method throws for smaller values. - if (besOptions.maxNamedSetEntries >= 2) { - // We only split the event after naming it to avoid splitting the same node multiple times. - // Note that the artifactGroupNames keeps references to the individual pieces, so this can - // double the memory consumption of large nested sets. - set = set.splitIfExceedsMaximumSize(besOptions.maxNamedSetEntries); - } + try (LatchedGroupName lockedName = artifactGroupNamer.maybeName(set)) { + if (lockedName == null) { + return; + } + set = NamedArtifactGroup.expandSet(ctx, set); + // Invariant: all leaf successors ("direct elements") of set are ExpandedArtifacts. + + // We only split if the max number of entries is at least 2 (it must be at least a binary + // tree). The method throws for smaller values. + if (besOptions.maxNamedSetEntries >= 2) { + // We only split the event after naming it to avoid splitting the same node multiple times. + // Note that the artifactGroupNames keeps references to the individual pieces, so this can + // double the memory consumption of large nested sets. + set = set.splitIfExceedsMaximumSize(besOptions.maxNamedSetEntries); + } - for (NestedSet succ : set.getNonLeaves()) { - maybeReportArtifactSet(ctx, succ); + for (NestedSet succ : set.getNonLeaves()) { + maybeReportArtifactSet(ctx, succ); + } + post(new NamedArtifactGroup(lockedName.getName(), ctx, set)); } - post(new NamedArtifactGroup(name, ctx, set)); } private void maybeReportConfiguration(BuildEvent configuration) { diff --git a/src/main/java/com/google/devtools/build/lib/runtime/CountingArtifactGroupNamer.java b/src/main/java/com/google/devtools/build/lib/runtime/CountingArtifactGroupNamer.java index c68d5e26253045..283c801d6640d0 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/CountingArtifactGroupNamer.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/CountingArtifactGroupNamer.java @@ -13,41 +13,76 @@ // limitations under the License. package com.google.devtools.build.lib.runtime; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId.NamedSetOfFilesId; import com.google.devtools.build.lib.collect.nestedset.NestedSet; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import javax.annotation.concurrent.ThreadSafe; /** Conversion of paths to URIs. */ @ThreadSafe public class CountingArtifactGroupNamer implements ArtifactGroupNamer { - private final Map nodeNames = new HashMap<>(); + private final ConcurrentMap nodeNames = + new ConcurrentHashMap<>(); @Override public NamedSetOfFilesId apply(NestedSet.Node id) { - Integer name; - synchronized (this) { - name = nodeNames.get(id); - } + LatchedGroupName name = nodeNames.get(id); if (name == null) { return null; } - return NamedSetOfFilesId.newBuilder().setId(name.toString()).build(); + return NamedSetOfFilesId.newBuilder().setId(name.getName()).build(); } /** * If the {@link NestedSet} has no name already, return a new name for it. Return null otherwise. */ - public synchronized String maybeName(NestedSet set) { + public LatchedGroupName maybeName(NestedSet set) { NestedSet.Node id = set.toNode(); - if (nodeNames.containsKey(id)) { + LatchedGroupName existingGroupName; + LatchedGroupName newGroupName; + // synchronized necessary only to ensure node names are chosen uniquely and compactly. + // TODO(adgar): consider dropping compactness and unconditionally increment an AtomicLong to + // pick unique node names. + synchronized (this) { + newGroupName = new LatchedGroupName(nodeNames.size()); + existingGroupName = nodeNames.putIfAbsent(id, newGroupName); + } + if (existingGroupName != null) { + existingGroupName.waitUntilWritten(); return null; } - Integer name = nodeNames.size(); - nodeNames.put(id, name); - return name.toString(); + return newGroupName; + } + + /** + * A name for a {@code NestedSet} that the constructor must {@link #close()} after the set is + * written, allowing all other consumers to {@link #waitUntilWritten()}. + */ + public static class LatchedGroupName implements AutoCloseable { + private final CountDownLatch latch; + private final int name; + + public LatchedGroupName(int name) { + this.latch = new CountDownLatch(1); + this.name = name; + } + + @Override + public void close() { + latch.countDown(); + } + + String getName() { + return Integer.toString(name); + } + + private void waitUntilWritten() { + Uninterruptibles.awaitUninterruptibly(latch); + } } } diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java index f645356907e38e..4c6cf9820dfe62 100644 --- a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java +++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java @@ -69,6 +69,7 @@ import com.google.devtools.build.lib.buildtool.buildevent.NoAnalyzeEvent; import com.google.devtools.build.lib.collect.nestedset.NestedSet; import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder; +import com.google.devtools.build.lib.collect.nestedset.Order; import com.google.devtools.build.lib.server.FailureDetails.FailureDetail; import com.google.devtools.build.lib.server.FailureDetails.Spawn; import com.google.devtools.build.lib.server.FailureDetails.Spawn.Code; @@ -86,6 +87,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -711,6 +713,110 @@ public void testReportedArtifacts() { assertThat(reportedArtifactSets.get(0)).isEqualTo(eventProtos.get(4).getId().getNamedSet()); } + @Test + public void testArtifactSetsPrecedeReportingEvent() throws InterruptedException { + // Verify that reported artifacts appear as named_set_of_files before their ID is referenced by + // a reporting event. + BuildEvent startEvent = + new GenericBuildEvent( + testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE)); + + // Prepare a dense NestedSet DAG with lots of shared references. + List> baseSets = new ArrayList<>(); + baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/a"))); + baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/b"))); + baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/c"))); + baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/d"))); + List> depth2Sets = new ArrayList<>(); + for (int i = 0; i < baseSets.size(); i++) { + depth2Sets.add( + NestedSetBuilder.stableOrder() + .addTransitive(baseSets.get(i)) + .addTransitive(baseSets.get((i + 1) % baseSets.size())) + .build()); + } + List> depth3Sets = new ArrayList<>(); + for (int i = 0; i < depth2Sets.size(); i++) { + depth3Sets.add( + NestedSetBuilder.stableOrder() + .addTransitive(depth2Sets.get(i)) + .addTransitive(depth2Sets.get((i + 1) % depth2Sets.size())) + .build()); + } + List> depth4Sets = new ArrayList<>(); + for (int i = 0; i < depth3Sets.size(); i++) { + depth4Sets.add( + NestedSetBuilder.stableOrder() + .addTransitive(depth3Sets.get(i)) + .addTransitive(depth3Sets.get((i + 1) % depth3Sets.size())) + .build()); + } + int numEvents = 20; + List eventsToPost = new ArrayList<>(); + for (int i = 0; i < numEvents; i++) { + eventsToPost.add( + new GenericArtifactReportingEvent( + testId("reporting" + i), ImmutableSet.of(depth4Sets.get(i % depth4Sets.size())))); + } + + streamer.buildEvent(startEvent); + // Publish `numEvents` different events that all report the same NamedSet of artifacts on + // `numEvents` different threads. Use latches to ensure: + // + // 1. all threads have started, before: + // 2. all threads send their event, before: + // 3. verifying the recorded events. + CountDownLatch readyToPublishLatch = new CountDownLatch(numEvents); + CountDownLatch startPublishingLatch = new CountDownLatch(1); + CountDownLatch donePublishingLatch = new CountDownLatch(numEvents); + for (int i = 0; i < numEvents; i++) { + int num = i; + new Thread( + () -> { + try { + BuildEvent reportingArtifacts = eventsToPost.get(num); + readyToPublishLatch.countDown(); + startPublishingLatch.await(); + streamer.buildEvent(reportingArtifacts); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + donePublishingLatch.countDown(); + }) + .start(); + } + readyToPublishLatch.await(); + startPublishingLatch.countDown(); + donePublishingLatch.await(); + + assertThat(streamer.isClosed()).isFalse(); + List allEventsSeen = transport.getEvents(); + List eventProtos = transport.getEventProtos(); + // Each GenericArtifactReportingEvent and NamedArtifactGroup event has a corresponding Progress + // event posted immediately before. + assertThat(allEventsSeen) + .hasSize(1 + ((numEvents + baseSets.size() + depth2Sets.size() + depth3Sets.size()) * 2)); + assertThat(allEventsSeen.get(0).getEventId()).isEqualTo(startEvent.getEventId()); + // Verify that each named_set_of_files event is sent before all of the events that report that + // named_set. + Set seenFileSets = new HashSet<>(); + for (int i = 1; i < eventProtos.size(); i++) { + BuildEventStreamProtos.BuildEvent buildEvent = eventProtos.get(i); + if (buildEvent.getId().hasNamedSet()) { + // These are the separately-posted contents of reported artifacts. + seenFileSets.add(buildEvent.getId().getNamedSet().getId()); + for (NamedSetOfFilesId nestedSetId : buildEvent.getNamedSetOfFiles().getFileSetsList()) { + assertThat(seenFileSets).contains(nestedSetId.getId()); + } + } else if (buildEvent.getId().hasUnknown()) { + // These are the GenericArtifactReportingEvent that report artifacts. + for (NamedSetOfFilesId nestedSetId : buildEvent.getNamedSetOfFiles().getFileSetsList()) { + assertThat(seenFileSets).contains(nestedSetId.getId()); + } + } + } + } + @Test public void testStdoutReported() { // Verify that stdout and stderr are reported in the build-event stream on progress