-
Notifications
You must be signed in to change notification settings - Fork 4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Codec for CcCompilationContext.HeaderInfo.
HeaderInfo requires cross-value memoization, like NestedSets. PiperOrigin-RevId: 570562294 Change-Id: I57b666ac876ffa2c6a64634ad7c725dcd4117291
- Loading branch information
1 parent
2052573
commit 9925a5b
Showing
3 changed files
with
337 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
309 changes: 309 additions & 0 deletions
309
src/main/java/com/google/devtools/build/lib/rules/cpp/HeaderInfoCodec.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,309 @@ | ||
// Copyright 2023 The Bazel Authors. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
package com.google.devtools.build.lib.rules.cpp; | ||
|
||
import static com.google.common.base.Throwables.throwIfInstanceOf; | ||
import static com.google.common.util.concurrent.Futures.immediateFuture; | ||
import static com.google.common.util.concurrent.MoreExecutors.directExecutor; | ||
|
||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.hash.Hashing; | ||
import com.google.common.util.concurrent.Futures; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.google.common.util.concurrent.SettableFuture; | ||
import com.google.devtools.build.lib.actions.Artifact; | ||
import com.google.devtools.build.lib.actions.Artifact.DerivedArtifact; | ||
import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.NestedSetStorageEndpoint; | ||
import com.google.devtools.build.lib.rules.cpp.CcCompilationContext.HeaderInfo; | ||
import com.google.devtools.build.lib.skyframe.serialization.DeserializationContext; | ||
import com.google.devtools.build.lib.skyframe.serialization.ObjectCodec; | ||
import com.google.devtools.build.lib.skyframe.serialization.SerializationContext; | ||
import com.google.devtools.build.lib.skyframe.serialization.SerializationException; | ||
import com.google.protobuf.ByteString; | ||
import com.google.protobuf.CodedInputStream; | ||
import com.google.protobuf.CodedOutputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InterruptedIOException; | ||
import java.util.ArrayList; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Executor; | ||
|
||
/** | ||
* A codec for {@link HeaderInfo} with intra-value memoization. | ||
* | ||
* <p>{@link HeaderInfo} recursively refers to other {@link HeaderInfo} instances that must be | ||
* serialized in memoized fashion to avoid quadratic storage costs. | ||
*/ | ||
public final class HeaderInfoCodec implements ObjectCodec<HeaderInfo> { | ||
private final NestedSetStorageEndpoint storageEndpoint; | ||
|
||
/** | ||
* An executor that performs deserialization computations when bytes are fetched from storage. | ||
* | ||
* <p>This exists to avoid tying up an RPC callback thread. | ||
*/ | ||
private final Executor executor; | ||
|
||
// TODO(b/297857068): while this is only in test code, a ConcurrentHashMap is fine. In production, | ||
// we will need to carefully consider how entries in these maps are invalidated. This could be as | ||
// simple as wiping them between invocations. | ||
// TODO(b/297857068): in both of the maps below, it is possible to unwrap the futures after they | ||
// complete to reduce memory consumption. | ||
private final ConcurrentHashMap<ByteString, ListenableFuture<HeaderInfo>> fingerprintToValue = | ||
new ConcurrentHashMap<>(); | ||
private final ConcurrentHashMap<HeaderInfo, ListenableFuture<ByteString>> referenceToFingerprint = | ||
new ConcurrentHashMap<>(); | ||
|
||
/** | ||
* A parameter set to improve coverage in round-trip testing. | ||
* | ||
* <p>When false, completion of serialization populates the {@link #fingerprintToValue} entries, | ||
* which is used to look up values from fingerprints during deserialization. However, this means | ||
* that when round-tripping in test scenarios, the deserialization isn't really exercised as it | ||
* always hits the populated {@link #fingerprintToValue} entries. Setting this flag true disables | ||
* populating {@link #fingerprintToValue} from serialization so the deserialization code can be | ||
* exercised. | ||
*/ | ||
private final boolean exerciseDeserializationForTesting; | ||
|
||
public HeaderInfoCodec( | ||
NestedSetStorageEndpoint storageEndpoint, | ||
Executor executor, | ||
boolean exerciseDeserializationForTesting) { | ||
this.storageEndpoint = storageEndpoint; | ||
this.executor = executor; | ||
this.exerciseDeserializationForTesting = exerciseDeserializationForTesting; | ||
} | ||
|
||
@Override | ||
public Class<HeaderInfo> getEncodedClass() { | ||
return HeaderInfo.class; | ||
} | ||
|
||
@Override | ||
public void serialize(SerializationContext context, HeaderInfo obj, CodedOutputStream codedOut) | ||
throws SerializationException, IOException { | ||
ByteString fingerprint; | ||
try { | ||
fingerprint = serializeToStorage(context, obj).get(); | ||
} catch (InterruptedException e) { | ||
throw new InterruptedIOException(); | ||
} catch (ExecutionException e) { | ||
Throwable cause = e.getCause(); | ||
throwIfInstanceOf(cause, SerializationException.class); | ||
throwIfInstanceOf(cause, IOException.class); | ||
throw new SerializationException("Unexpected execution exception", cause); | ||
} | ||
codedOut.writeBytesNoTag(fingerprint); | ||
} | ||
|
||
@Override | ||
public HeaderInfo deserialize(DeserializationContext context, CodedInputStream codedIn) | ||
throws SerializationException, IOException { | ||
ByteString fingerprint = codedIn.readBytes(); | ||
try { | ||
return deserializeFromStorage(context, fingerprint).get(); | ||
} catch (InterruptedException e) { | ||
throw new InterruptedIOException(); | ||
} catch (ExecutionException e) { | ||
Throwable cause = e.getCause(); | ||
throwIfInstanceOf(cause, SerializationException.class); | ||
throwIfInstanceOf(cause, IOException.class); | ||
throw new SerializationException("Unexpected execution exception", cause); | ||
} | ||
} | ||
|
||
/** | ||
* Serializes {@code headerInfo} to storage. | ||
* | ||
* <p>{@link HeaderInfo} is a recursive data structure that may include an entire DAG of {@link | ||
* HeaderInfo} values. This method is called recursively as subvalues occur. | ||
* | ||
* @return a unique fingerprint mapped to the {@link HeaderInfo} value | ||
*/ | ||
private ListenableFuture<ByteString> serializeToStorage( | ||
SerializationContext baseContext, HeaderInfo headerInfo) | ||
throws SerializationException, IOException { | ||
var settableFingerprint = SettableFuture.<ByteString>create(); | ||
var previousFingerprint = referenceToFingerprint.putIfAbsent(headerInfo, settableFingerprint); | ||
if (previousFingerprint != null) { | ||
return previousFingerprint; | ||
} | ||
|
||
// The context must be created fresh each time to reset the memoization state. The result should | ||
// be deserializable on its own, without additional context. | ||
SerializationContext context = baseContext.getNewMemoizingContext(); | ||
// Care must be taken to ensure the SettableFuture is actually set to avoid hanging elsewhere. | ||
boolean futureWasSet = false; | ||
try { | ||
var bytesOut = new ByteArrayOutputStream(); | ||
var codedOut = CodedOutputStream.newInstance(bytesOut); | ||
context.serialize(headerInfo.headerModule, codedOut); | ||
context.serialize(headerInfo.picHeaderModule, codedOut); | ||
context.serialize(headerInfo.modularPublicHeaders, codedOut); | ||
context.serialize(headerInfo.modularPrivateHeaders, codedOut); | ||
context.serialize(headerInfo.textualHeaders, codedOut); | ||
context.serialize(headerInfo.separateModuleHeaders, codedOut); | ||
context.serialize(headerInfo.separateModule, codedOut); | ||
context.serialize(headerInfo.separatePicModule, codedOut); | ||
|
||
ImmutableList<HeaderInfo> deps = headerInfo.deps; | ||
codedOut.writeInt32NoTag(deps.size()); | ||
|
||
// In both branches below, the future waits until the bytes are stored in the NestedSetStore. | ||
// It may be possible to achieve higher throughput by making the fingerprints available as | ||
// soon as possible, without waiting for storage to complete, but that would require | ||
// additional coordination to be built somewhere. | ||
if (deps.isEmpty()) { | ||
codedOut.flush(); | ||
settableFingerprint.setFuture(storeByFingerprint(headerInfo, bytesOut.toByteArray())); | ||
} else { | ||
var depFutures = new ArrayList<ListenableFuture<ByteString>>(deps.size()); | ||
for (HeaderInfo dep : deps) { | ||
depFutures.add(serializeToStorage(baseContext, dep)); | ||
} | ||
settableFingerprint.setFuture( | ||
Futures.whenAllSucceed(depFutures) | ||
.callAsync( | ||
() -> { | ||
for (var depFuture : depFutures) { | ||
codedOut.writeBytesNoTag(Futures.getDone(depFuture)); | ||
} | ||
codedOut.flush(); | ||
return storeByFingerprint(headerInfo, bytesOut.toByteArray()); | ||
}, | ||
directExecutor())); | ||
} | ||
futureWasSet = true; | ||
} catch (SerializationException | IOException e) { | ||
settableFingerprint.setException(e); | ||
futureWasSet = true; | ||
throw e; | ||
} finally { | ||
if (!futureWasSet) { | ||
// Fails fast (instead of potentially causing a client to hang) if there's an unanticipated | ||
// runtime exception. | ||
settableFingerprint.setException(new IllegalStateException("The future was not set!")); | ||
} | ||
} | ||
|
||
return settableFingerprint; | ||
} | ||
|
||
private ListenableFuture<HeaderInfo> deserializeFromStorage( | ||
DeserializationContext context, ByteString fingerprint) throws IOException { | ||
var settableValue = SettableFuture.<HeaderInfo>create(); | ||
var previousValue = fingerprintToValue.putIfAbsent(fingerprint, settableValue); | ||
if (previousValue != null) { | ||
return previousValue; | ||
} | ||
|
||
boolean futureWasSet = false; | ||
try { | ||
settableValue.setFuture( | ||
Futures.transformAsync( | ||
storageEndpoint.get(fingerprint), | ||
bytes -> deserializeBytes(context, fingerprint, bytes), | ||
executor)); | ||
futureWasSet = true; | ||
} catch (IOException e) { | ||
settableValue.setException(e); | ||
futureWasSet = true; | ||
throw e; | ||
} finally { | ||
if (!futureWasSet) { | ||
// Fails fast (instead of potentially causing a client to hang) if there's an unanticipated | ||
// runtime exception. | ||
settableValue.setException(new IllegalStateException("The future was not set!")); | ||
} | ||
} | ||
|
||
return settableValue; | ||
} | ||
|
||
private ListenableFuture<HeaderInfo> deserializeBytes( | ||
DeserializationContext baseContext, ByteString fingerprint, byte[] bytes) | ||
throws SerializationException, IOException { | ||
var codedIn = CodedInputStream.newInstance(bytes); | ||
var context = baseContext.getNewMemoizingContext(); | ||
|
||
DerivedArtifact headerModule = context.deserialize(codedIn); | ||
DerivedArtifact picHeaderModule = context.deserialize(codedIn); | ||
ImmutableList<Artifact> modularPublicHeaders = context.deserialize(codedIn); | ||
ImmutableList<Artifact> modularPrivateHeaders = context.deserialize(codedIn); | ||
ImmutableList<Artifact> textualHeaders = context.deserialize(codedIn); | ||
ImmutableList<Artifact> separateModuleHeaders = context.deserialize(codedIn); | ||
DerivedArtifact separateModule = context.deserialize(codedIn); | ||
DerivedArtifact separatePicModule = context.deserialize(codedIn); | ||
|
||
int depCount = codedIn.readInt32(); | ||
if (depCount == 0) { | ||
var headerInfo = | ||
new HeaderInfo( | ||
headerModule, | ||
picHeaderModule, | ||
modularPublicHeaders, | ||
modularPrivateHeaders, | ||
textualHeaders, | ||
separateModuleHeaders, | ||
separateModule, | ||
separatePicModule, | ||
ImmutableList.of()); | ||
referenceToFingerprint.put(headerInfo, immediateFuture(fingerprint)); | ||
return immediateFuture(headerInfo); | ||
} | ||
|
||
var futureDeps = new ArrayList<ListenableFuture<HeaderInfo>>(depCount); | ||
for (int i = 0; i < depCount; ++i) { | ||
futureDeps.add(deserializeFromStorage(baseContext, codedIn.readBytes())); | ||
} | ||
return Futures.whenAllSucceed(futureDeps) | ||
.call( | ||
() -> { | ||
var deps = ImmutableList.<HeaderInfo>builderWithExpectedSize(depCount); | ||
for (var futureDep : futureDeps) { | ||
deps.add(Futures.getDone(futureDep)); | ||
} | ||
var headerInfo = | ||
new HeaderInfo( | ||
headerModule, | ||
picHeaderModule, | ||
modularPublicHeaders, | ||
modularPrivateHeaders, | ||
textualHeaders, | ||
separateModuleHeaders, | ||
separateModule, | ||
separatePicModule, | ||
deps.build()); | ||
referenceToFingerprint.put(headerInfo, immediateFuture(fingerprint)); | ||
return headerInfo; | ||
}, | ||
directExecutor()); | ||
} | ||
|
||
private ListenableFuture<ByteString> storeByFingerprint(HeaderInfo info, byte[] bytes) | ||
throws IOException { | ||
// TODO(b/297857068): observe that the fingerprint value is not made available until after a | ||
// round trip to through the storage service. This means that it should be possible to replace | ||
// the locally generated fingerprint with a storage-service generated UUID. | ||
ByteString fingerprint = ByteString.copyFrom(Hashing.md5().hashBytes(bytes).asBytes()); | ||
if (!exerciseDeserializationForTesting) { | ||
fingerprintToValue.putIfAbsent(fingerprint, immediateFuture(info)); | ||
} | ||
return Futures.transform( | ||
storageEndpoint.put(fingerprint, bytes), unusedVoid -> fingerprint, directExecutor()); | ||
} | ||
} |