Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unit tests in source relational db #18789

Merged
merged 4 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void testToStateFromLegacyState() {
.stream().sorted(Comparator.comparing(DbStreamState::getStreamName)).collect(Collectors.toList()));
final StateManager stateManager = new GlobalStateManager(new AirbyteStateMessage().withData(Jsons.jsonNode(dbState)), catalog);

final long expectedRecordCount = 19L;
final DbState expectedDbState = new DbState()
.withCdc(true)
.withCdcState(cdcState)
Expand All @@ -98,7 +99,8 @@ void testToStateFromLegacyState() {
.withStreamName(STREAM_NAME1)
.withStreamNamespace(NAMESPACE)
.withCursorField(List.of(CURSOR_FIELD1))
.withCursor("a"),
.withCursor("a")
.withCursorRecordCount(expectedRecordCount),
new DbStreamState()
.withStreamName(STREAM_NAME2)
.withStreamNamespace(NAMESPACE)
Expand All @@ -117,7 +119,8 @@ void testToStateFromLegacyState() {
.withStreamName(STREAM_NAME1)
.withStreamNamespace(NAMESPACE)
.withCursorField(List.of(CURSOR_FIELD1))
.withCursor("a"))),
.withCursor("a")
.withCursorRecordCount(expectedRecordCount))),
new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(STREAM_NAME2).withNamespace(NAMESPACE))
.withStreamState(Jsons.jsonNode(new DbStreamState()
Expand All @@ -135,7 +138,7 @@ void testToStateFromLegacyState() {
.withGlobal(expectedGlobalState)
.withType(AirbyteStateType.GLOBAL);

final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a", 1L);
final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a", expectedRecordCount);
assertEquals(expected, actualFirstEmission);
}

Expand Down Expand Up @@ -187,7 +190,8 @@ void testToState() {
.withStreamName(STREAM_NAME1)
.withStreamNamespace(NAMESPACE)
.withCursorField(List.of(CURSOR_FIELD1))
.withCursor("a"))),
.withCursor("a")
.withCursorRecordCount(1L))),
new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(STREAM_NAME2).withNamespace(NAMESPACE))
.withStreamState(Jsons.jsonNode(new DbStreamState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ void testCreationFromInvalidState() {
@Test
void testGetters() {
final List<AirbyteStateMessage> state = new ArrayList<>();
state.add(createStreamState(STREAM_NAME1, NAMESPACE, List.of(CURSOR_FIELD1), CURSOR));
state.add(createStreamState(STREAM_NAME2, NAMESPACE, List.of(), null));
state.add(createStreamState(STREAM_NAME1, NAMESPACE, List.of(CURSOR_FIELD1), CURSOR, 0L));
state.add(createStreamState(STREAM_NAME2, NAMESPACE, List.of(), null, 0L));

final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog()
.withStreams(List.of(
Expand Down Expand Up @@ -113,8 +113,7 @@ void testToState() {
.withStreamName(STREAM_NAME1)
.withStreamNamespace(NAMESPACE)
.withCursorField(List.of(CURSOR_FIELD1))
.withCursor("a")
.withCursorRecordCount(1L),
.withCursor("a"),
new DbStreamState()
.withStreamName(STREAM_NAME2)
.withStreamNamespace(NAMESPACE)
Expand All @@ -124,11 +123,12 @@ void testToState() {
.withStreamNamespace(NAMESPACE))
.stream().sorted(Comparator.comparing(DbStreamState::getStreamName)).collect(Collectors.toList()));
final AirbyteStateMessage expectedFirstEmission =
createStreamState(STREAM_NAME1, NAMESPACE, List.of(CURSOR_FIELD1), "a").withData(Jsons.jsonNode(expectedFirstDbState));
createStreamState(STREAM_NAME1, NAMESPACE, List.of(CURSOR_FIELD1), "a", 0L).withData(Jsons.jsonNode(expectedFirstDbState));

final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a");
assertEquals(expectedFirstEmission, actualFirstEmission);

final long expectedRecordCount = 17L;
final DbState expectedSecondDbState = new DbState()
.withCdc(false)
.withStreams(List.of(
Expand All @@ -141,15 +141,16 @@ void testToState() {
.withStreamName(STREAM_NAME2)
.withStreamNamespace(NAMESPACE)
.withCursorField(List.of(CURSOR_FIELD2))
.withCursor("b"),
.withCursor("b")
.withCursorRecordCount(expectedRecordCount),
new DbStreamState()
.withStreamName(STREAM_NAME3)
.withStreamNamespace(NAMESPACE))
.stream().sorted(Comparator.comparing(DbStreamState::getStreamName)).collect(Collectors.toList()));
final AirbyteStateMessage expectedSecondEmission =
createStreamState(STREAM_NAME2, NAMESPACE, List.of(CURSOR_FIELD2), "b").withData(Jsons.jsonNode(expectedSecondDbState));
createStreamState(STREAM_NAME2, NAMESPACE, List.of(CURSOR_FIELD2), "b", expectedRecordCount).withData(Jsons.jsonNode(expectedSecondDbState));

final AirbyteStateMessage actualSecondEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR2, "b");
final AirbyteStateMessage actualSecondEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR2, "b", expectedRecordCount);
assertEquals(expectedSecondEmission, actualSecondEmission);
}

Expand Down Expand Up @@ -228,7 +229,7 @@ void testToStateNullCursorField() {
.stream().sorted(Comparator.comparing(DbStreamState::getStreamName)).collect(Collectors.toList()));

final AirbyteStateMessage expectedFirstEmission =
createStreamState(STREAM_NAME1, NAMESPACE, List.of(CURSOR_FIELD1), "a").withData(Jsons.jsonNode(expectedFirstDbState));
createStreamState(STREAM_NAME1, NAMESPACE, List.of(CURSOR_FIELD1), "a", 0L).withData(Jsons.jsonNode(expectedFirstDbState));
final AirbyteStateMessage actualFirstEmission = stateManager.updateAndEmit(NAME_NAMESPACE_PAIR1, "a");
assertEquals(expectedFirstEmission, actualFirstEmission);
}
Expand All @@ -248,7 +249,8 @@ private List<AirbyteStateMessage> createDefaultState() {
private AirbyteStateMessage createStreamState(final String name,
final String namespace,
final List<String> cursorFields,
final String cursorValue) {
final String cursorValue,
final long cursorRecordCount) {
final DbStreamState dbStreamState = new DbStreamState()
.withStreamName(name)
.withStreamNamespace(namespace);
Expand All @@ -261,6 +263,10 @@ private AirbyteStateMessage createStreamState(final String name,
dbStreamState.withCursor(cursorValue);
}

if (cursorRecordCount > 0L) {
dbStreamState.withCursorRecordCount(cursorRecordCount);
}

return new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
Expand Down