-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Suppress DEADLINE_EXCEEDED on download progress #5230
Conversation
@@ -174,34 +174,96 @@ public void ensureInputsPresent( | |||
uploader.uploadBlobs(toUpload); | |||
} | |||
|
|||
class PositionalOutputStream extends OutputStream { | |||
private final OutputStream delegate; | |||
private transient long offset = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why transient
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary, dropping it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall a good idea, I am somewhat worried about slow downloads where one would just want to fallback to local execution. Also, this code is used in remote caching where slow downloads are more of an issue. However, these concerns are likely unfounded and so I think we should merge this.
Also, can you please add some tests?
Agreed about the fallback behavior - what this is really lacking is a global heuristic/traffic shaping mechanism that fully saturates the bandwidth available and locally executes anything after that point up until the cpu/disk/ram capacity. But realistically we're just talking about bin-packing, and I've not seen any indication that races are the norm and that falling back to local is not exceptional behavior. |
754a2dc
to
71b5b0e
Compare
ReadRequest.newBuilder() | ||
.setResourceName(resourceName) | ||
.setReadOffset(offset) | ||
.build(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that this will not work as is because you're requesting a different offset every time, but disregard this offset when writing to out, so you would need something like the code that you commented out below to do it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're completely right. Commented code is coming out, scope of Offset (and Hashing) has to move to creation outside of the retrier.
d717f8d
to
1cdbf6f
Compare
I like your idea with the progress and I think we should also apply this to uploads. I think we should go even further and make I think up/downloads should continue as long as they make progress and the timeout should apply to all other calls, but mostly execute really which one would expect to always take the longest. Further down the road we should probably remove the WDYT? |
I think the timeout is necessary in the case where you don't make progress, but that clocking progress needs to be made into an intercepted behavior on the channel if such a thing is available in grpc - we are essentially monitoring starvation without a centralized timer reset. Agreed on the upload addition, that actually brings up a point that the ByteStream documentation is not clear on:
Will (other) clients react poorly if they see a WriteResponse, for instance, for every WriteRequest with the current committed_size? Pertinent to execute, we would need to be talking about the v2 streaming api, which I am anxiously awaiting seeing client code for. |
What's the status of this change? |
57347de
to
5fb574d
Compare
Resolved the merge conflicts. I would like to see a separate commit or issue to push out incremental uploads and the timeout stuff. I think this is a good stopgap to prevent concurrent downloads based on link saturation, and if we want fundamental changes like a 'give up and fallback to local', that we need a real state machine designed so that suddenly my 500-wide workerset doesn't swing its hammer down to local and smash my RAM to bits. |
Ping? @buchgr can we defer some of the other magical robustness changes to later additions? |
I apologize for being absent George! Would you mind rebasing this PR in case you are still interested in getting this merged? Thanks! |
Master is merged, passed all CI tests. Let's get this one done. |
A gRPC downloadBlob can be distributed over multiple sequential ByteStream::read requests, all of which will fail prior to ultimate completion or failure of the download, represented by a single ListenableFuture. To provide this abstraction, GrpcRemoteCache::downloadBlob supplies the requests with an offset that transits committed size between each request, a Retrier.Backoff that is reset each time partial content is received, and an optional hash supplier attached to a digesting filter bound to the lifetime of the download. Each request retains the count of bytes that has been committed to the output stream since it began. When a request error is encountered during a gRPC blob download, the client proceeds as follows: If the error is not retriable as determined by the GrpcActionCache's retrier, the blob download future fails. If the error is retriable and any positive size of partial content has been committed to the output stream since the request began, a new request is initiated with a read from the current request offset plus the committed size. If the error is retriable and no partial content has been committed to the output stream, a delay is acquired from its backoff. If the delay is negative, the download fails immediately with the error. Otherwise a new request is initiated with a read from the current request offset. All retry and grpc Context logic has been removed from the AbstractRemoteActionCache, making the individual ActionCache/store implementations responsible for their own retry mechanisms.
f03fa20
to
1ac9d1f
Compare
The stalled backoff, updated since last progress, should be the only backoff used in the observer object.
@philwo he and I spoke offline, I will do so as soon as he's back on his feet. |
Avoid considering non-DEADLINE_EXCEEDED results when deciding whether to use a retry attempt/backoff delay.
src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
Outdated
Show resolved
Hide resolved
|
||
private ListenableFuture<Void> requestRead( | ||
String resourceName, | ||
AtomicLong offset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I appreciate your ingenuity, I'd argue that this is quite the hack. I am wondering if this whole logic couldn't be implemented in the Retrier itself i.e.
class ProgressiveRetrier extends Retrier {
public ListenableFuture<Void> executeAsync(AsyncCallable<Long> call, long totalBytes) {
return executeAsync(new AsyncCallable<Void>() {
// implement progressive retry logic in here. in case of deadline exceeded and having made progress
// immediately retry.
}, newBackoff());
}
}
I haven't fully thought it through but I think this could be made generic enough so that it could also be used in the ByteStreamUploader
and the HttpBlobStore
. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whatever gets me to an accept at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming that this comment addresses the progressive offset holder, I don't see how totalBytes can be used in the retrier to effect this: call
presents an offset-worthy result only in the case of non-exception, but DEADLINE_EXCEEDED will be an exception state for the future. Do you expect the call to swallow the DEADLINE_EXCEEDED and respond with a committedSize, expecting the retrier to recall until it matches totalBytes? If so, the call still needs to maintain the current offset (per the AtomicLong), since there's no way to pass it back to the subsequent one.
outerF.setException(t); | ||
} | ||
}, | ||
Context.current().fixedContextExecutor(MoreExecutors.directExecutor())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentContextExecutor(...)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Negative. currentContextExecutor will take the threads context from the caller of execute, not from the call of the method. In this case that is a scheduled executor thread for the retrier. I spent hours trying to figure out how currentContextExecutor was doing me wrong before I realized how. It has all the problems of directExecutor with a confusing name
Prevent DEADLINE_EXCEEDED from contributing to retry counter when it
is making progress, and preserve progress in between retry attempts on a
single file.