Skip to content

Commit

Permalink
[Bugfix][CDC Base] Solving the ConcurrentModificationException caused…
Browse files Browse the repository at this point in the history
… by snapshotState being modified concurrently. (#4877)
  • Loading branch information
ic4y authored Jun 5, 2023
1 parent f4077cb commit 9a2efa5
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
(CompletedSnapshotSplitsReportEvent) sourceEvent;
List<SnapshotSplitWatermark> completedSplitWatermarks =
reportEvent.getCompletedSnapshotSplitWatermarks();
splitAssigner.onCompletedSplits(completedSplitWatermarks);
synchronized (context) {
splitAssigner.onCompletedSplits(completedSplitWatermarks);
}

// send acknowledge event
CompletedSnapshotSplitsAckEvent ackEvent =
Expand Down Expand Up @@ -153,7 +155,10 @@ private void assignSplits() {
continue;
}

Optional<SourceSplitBase> split = splitAssigner.getNext();
Optional<SourceSplitBase> split;
synchronized (context) {
split = splitAssigner.getNext();
}
if (split.isPresent()) {
final SourceSplitBase sourceSplit = split.get();
context.assignSplit(nextAwaiting, sourceSplit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public Lsn getCommitLsn() {
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
}

public Long getEventSerialNo() {
return Long.valueOf(offset.get(SourceInfo.EVENT_SERIAL_NO_KEY));
public Object getEventSerialNo() {
return offset.get(SourceInfo.EVENT_SERIAL_NO_KEY);
}

public int compareTo(Offset o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public void triggerBarrier(Barrier barrier) throws Exception {
final long barrierId = barrier.getId();
Serializable snapshotState = null;
byte[] serialize = null;
// Do not modify this lock object, as it is also used in the SourceSplitEnumerator.
synchronized (enumeratorContext) {
if (barrier.snapshot()) {
snapshotState = enumerator.snapshotState(barrierId);
Expand Down

0 comments on commit 9a2efa5

Please sign in to comment.