Skip to content

Commit

Permalink
[Bug][Connector-V2][Doris] update last checkpoint id when doing snaps…
Browse files Browse the repository at this point in the history
…hot (apache#4881)
  • Loading branch information
gnehil authored and EricJoy2048 committed Jul 11, 2023
1 parent 2cb2e67 commit 16457df
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo
private static final int CONNECT_TIMEOUT = 1000;
private static final List<String> DORIS_SUCCESS_STATUS =
new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));
private final long lastCheckpointId;
private long lastCheckpointId;
private DorisStreamLoad dorisStreamLoad;
volatile boolean loading;
private final DorisConfig dorisConfig;
Expand Down Expand Up @@ -156,7 +156,8 @@ public List<DorisSinkState> snapshotState(long checkpointId) throws IOException
this.dorisStreamLoad.setHostPort(getAvailableBackend());
this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
this.loading = true;
return Collections.singletonList(dorisSinkState);
this.lastCheckpointId = checkpointId;
return Collections.singletonList(new DorisSinkState(labelPrefix, lastCheckpointId));
}

@Override
Expand Down

0 comments on commit 16457df

Please sign in to comment.