-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in Arbitrary/BroadcastOutputBuffer #15155
Fix race condition in Arbitrary/BroadcastOutputBuffer #15155
Conversation
A buffer may be transitioned to ABORTED while in process of setting final OutputBuffers (with noMoreBuffers flag set to true).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this problem started occurring now?
checkState(stateMachine.getState().canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); | ||
BufferState state = stateMachine.getState(); | ||
// buffer may become aborted while the final output buffers are being set | ||
checkState(state == ABORTED || state.canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't you destroy buffer immediately if state is aborted
like below:
if (stateMachine.getState() == FINISHED) {
buffer.destroy();
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something that is very non obvious in OutputBuffers implementation. We destroy buffers only if an upstream task finished successfully as "destroy" usually propagates a "success" signal to a downstream task. However if an upstream task is failed we don't destroy buffers and wait for coordinator to kill downstream tasks.
checkState(stateMachine.getState().canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); | ||
BufferState state = stateMachine.getState(); | ||
// buffer may become aborted while the final output buffers are being set | ||
checkState(state == ABORTED || state.canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); | ||
|
||
// NOTE: buffers are allowed to be created before they are explicitly declared by setOutputBuffers | ||
// When no-more-buffers is set, we verify that all created buffers have been declared |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuse state below if (stateMachine.getState() == FINISHED) {
? Is there a change for race condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory there shouldn't be a race condition, as getBuffer
is synchronized, and safeGetBuffersSnapshot()
used by finish
is also synchronized. But somehow I'm still reluctant of changing semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason it looks like a race is because getBuffer
is synchronzied, but buffer state machine isn't, so I though that state might change while getBuffer
is progressing.
Neither io.trino.execution.buffer.ArbitraryOutputBuffer#setNoMorePages
or io.trino.execution.buffer.ArbitraryOutputBuffer#destroy()
is synchronized
@@ -370,7 +370,8 @@ private synchronized ClientBuffer getBuffer(OutputBufferId id) | |||
// without a clean "no-more-buffers" message from the scheduler. This happens with limit queries and is ok because | |||
// the buffer will be immediately destroyed. | |||
BufferState state = stateMachine.getState(); | |||
checkState(state.canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); | |||
// buffer may become aborted while the final output buffers are being set | |||
checkState(state == ABORTED || state.canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand, we already have if (state != ABORTED) {
check below. It was always true
before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily.
See state.canAddBuffers() || !outputBuffers.isNoMoreBufferIds()
When the buffer state is ABORTED
the state.canAddBuffers()
check doesn't pass, however it is possible for the !outputBuffers.isNoMoreBufferIds()
to pass.
It looks like an ancient problem. I think it got discovered only now as we didn't have a lot of test coverage for error propagation. |
checkState(stateMachine.getState().canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); | ||
BufferState state = stateMachine.getState(); | ||
// buffer may become aborted while the final output buffers are being set | ||
checkState(state == ABORTED || state.canAddBuffers() || !outputBuffers.isNoMoreBufferIds(), "No more buffers already set"); | ||
|
||
// NOTE: buffers are allowed to be created before they are explicitly declared by setOutputBuffers | ||
// When no-more-buffers is set, we verify that all created buffers have been declared |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason it looks like a race is because getBuffer
is synchronzied, but buffer state machine isn't, so I though that state might change while getBuffer
is progressing.
Neither io.trino.execution.buffer.ArbitraryOutputBuffer#setNoMorePages
or io.trino.execution.buffer.ArbitraryOutputBuffer#destroy()
is synchronized
Description
A buffer may be transitioned to ABORTED while in process of setting final OutputBuffers (with noMoreBuffers flag set to true).
Additional context and related issues
Fixes #14814 (comment)
Release notes
(X) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: