Skip to content

Commit

Permalink
fix: MultiThreadTest flakiness
Browse files Browse the repository at this point in the history
This test was only flaky when running it using testcontainers. The problem lies in the RecordStreamSourceWrapper. In this class we would get all the records from the engine (RecordStreamSource). These records would be mapped to json and put into a list. The problem for this test was that we just override all records everytime we request them from the engine.

With this change we keep track of the latest event we've mapped. When we get a new request we will fetch all the records of the engine. We will filter those so we only have the records that we haven't stored in our list yet. These will be mapped and added. When the mapping is done we return a copy of the list. We need to make sure no more records are added, because the calling method will iterate over this list.
  • Loading branch information
remcowesterhoud committed Mar 3, 2022
1 parent 35b33a5 commit 9892291
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,31 @@

import io.camunda.zeebe.process.test.api.RecordStreamSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.StreamSupport;

public class RecordStreamSourceWrapper {

private final List<String> mappedRecords = new ArrayList<>();
private final RecordStreamSource recordStreamSource;
private volatile long lastEventPosition = -1L;

public RecordStreamSourceWrapper(final RecordStreamSource recordStreamSource) {
this.recordStreamSource = recordStreamSource;
}

public List<String> getMappedRecords() {
synchronized (mappedRecords) {
mappedRecords.clear();
recordStreamSource.records().forEach(record -> mappedRecords.add(record.toJson()));
StreamSupport.stream(recordStreamSource.records().spliterator(), false)
.filter(record -> record.getPosition() > lastEventPosition)
.forEach(
record -> {
mappedRecords.add(record.toJson());
lastEventPosition = record.getPosition();
});
}
return new ArrayList<>(mappedRecords);

return Collections.unmodifiableList(mappedRecords);
}
}

0 comments on commit 9892291

Please sign in to comment.