Skip to content

Commit

Permalink
do not abort instant when some write task was failed
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Jun 24, 2022
1 parent 237c256 commit 6099161
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,12 @@ public void notifyCheckpointComplete(long checkpointId) {
@Override
public void notifyCheckpointAborted(long checkpointId) {
if (checkpointId == this.checkpointId) {
executor.execute(() -> {
this.ckpMetadata.abortInstant(this.instant);
}, "abort instant %s", this.instant);
// write task failed we do not reuse the instant
if (Arrays.stream(eventBuffer).anyMatch(s -> s == null)) {
executor.execute(() -> {
this.ckpMetadata.abortInstant(this.instant);
}, "abort instant %s", this.instant);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,6 @@ protected String instantToWrite(boolean hasData) {
* Returns whether the pending instant is invalid to write with.
*/
private boolean invalidInstant(String instant, boolean hasData) {
return instant.equals(this.currentInstant) && hasData && this.ckpMetadata.isAborted(instant);
return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
}
}

0 comments on commit 6099161

Please sign in to comment.