Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per-stream state support for Postgres source #13609

Merged
merged 34 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a2db8c0
WIP Per-stream state support for Postgres source
jdpgrailsdev Jun 8, 2022
8ebec44
Fix failing test
jdpgrailsdev Jun 8, 2022
5158bad
Improve code coverage
jdpgrailsdev Jun 8, 2022
960919c
Make global the default state manager
jdpgrailsdev Jun 8, 2022
89bc6a0
Add legacy adapter state manager
jdpgrailsdev Jun 9, 2022
fa0a742
Formatting
jdpgrailsdev Jun 9, 2022
a43cbb8
Include legacy state for backwards compatibility
jdpgrailsdev Jun 10, 2022
b108604
Add global state manager
jdpgrailsdev Jun 10, 2022
f3f2499
Implement Global/CDC state handling
jdpgrailsdev Jun 13, 2022
9825df7
Fix test issues
jdpgrailsdev Jun 13, 2022
2d1b954
Fix issue with updated method signature
jdpgrailsdev Jun 13, 2022
15cd5d2
Handle empty state case in global state manager
jdpgrailsdev Jun 13, 2022
80a497a
Adjust to protocol changes
jdpgrailsdev Jun 13, 2022
e8b853b
Fix failing acceptance tests
jdpgrailsdev Jun 14, 2022
9a940e4
Fix failing test
jdpgrailsdev Jun 14, 2022
be766c7
Fix unmodifiable list issue
jdpgrailsdev Jun 14, 2022
e34efc1
Fix unmodifiable exception
jdpgrailsdev Jun 14, 2022
9c9aefb
PR feedback
jdpgrailsdev Jun 14, 2022
9e8e1ae
Abstract global state manager selection
jdpgrailsdev Jun 14, 2022
9a9c782
Handle conversion between different state types
jdpgrailsdev Jun 14, 2022
5c83969
Handle invalid conversion
jdpgrailsdev Jun 14, 2022
fe076d7
Rename parameter
jdpgrailsdev Jun 14, 2022
c4824a3
Refactor state manager creation
jdpgrailsdev Jun 15, 2022
1a500d8
Fix failing tests
jdpgrailsdev Jun 15, 2022
d21a6a0
Fix failing integration tests
jdpgrailsdev Jun 15, 2022
1c5edbd
Add CDC test
jdpgrailsdev Jun 16, 2022
b478d57
Fix failing integration test
jdpgrailsdev Jun 16, 2022
ae867bf
Revert change
jdpgrailsdev Jun 16, 2022
b753b14
Fix failing integration test
jdpgrailsdev Jun 16, 2022
969de14
Use per-stream for postgres tests
jdpgrailsdev Jun 16, 2022
5c0ba55
Formatting
jdpgrailsdev Jun 17, 2022
cf079be
Correct stream descriptor validation
jdpgrailsdev Jun 17, 2022
12dd14c
Correct permalink
jdpgrailsdev Jun 17, 2022
ef81d69
PR feedback
jdpgrailsdev Jun 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ void testDelete() throws Exception {
.format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID,
11));

final JsonNode state = stateMessages1.get(0).getData();
final JsonNode state = Jsons.jsonNode(stateMessages1);
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
Expand Down Expand Up @@ -347,7 +347,7 @@ void testUpdate() throws Exception {
.format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME,
COL_MODEL, updatedModel, COL_ID, 11));

final JsonNode state = stateMessages1.get(0).getData();
final JsonNode state = Jsons.jsonNode(stateMessages1);
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
Expand Down Expand Up @@ -403,7 +403,7 @@ void testRecordsProducedDuringAndAfterSync() throws Exception {
recordsCreated[0]++;
}

final JsonNode state = stateAfterFirstBatch.get(0).getData();
final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch);
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
Expand Down Expand Up @@ -492,7 +492,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto"));
writeModelRecord(puntoRecord);

final JsonNode state = extractStateMessages(actualRecords1).get(0).getData();
final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1));
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
Expand Down Expand Up @@ -535,7 +535,7 @@ void testNoDataOnSecondSync() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final JsonNode state = extractStateMessages(actualRecords1).get(0).getData();
final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1));

final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ void testDelete() throws Exception {
.format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID,
11));

final JsonNode state = stateMessages1.get(0).getData();
final JsonNode state = Jsons.jsonNode(stateMessages1);
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
Expand Down Expand Up @@ -347,7 +347,7 @@ void testUpdate() throws Exception {
.format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME,
COL_MODEL, updatedModel, COL_ID, 11));

final JsonNode state = stateMessages1.get(0).getData();
final JsonNode state = Jsons.jsonNode(stateMessages1);
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
Expand Down Expand Up @@ -399,7 +399,7 @@ protected void testRecordsProducedDuringAndAfterSync() throws Exception {
writeModelRecord(record);
}

final JsonNode state = stateAfterFirstBatch.get(0).getData();
final JsonNode state = Jsons.jsonNode(stateAfterFirstBatch);
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
Expand Down Expand Up @@ -488,7 +488,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
.jsonNode(ImmutableMap.of(COL_ID, 100, COL_MAKE_ID, 3, COL_MODEL, "Punto"));
writeModelRecord(puntoRecord);

final JsonNode state = extractStateMessages(actualRecords1).get(0).getData();
final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1));
final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), configuredCatalog, state);
final List<AirbyteMessage> actualRecords2 = AutoCloseableIterators.toListAndClose(read2);
Expand Down Expand Up @@ -531,7 +531,7 @@ void testNoDataOnSecondSync() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read1 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords1 = AutoCloseableIterators.toListAndClose(read1);
final JsonNode state = extractStateMessages(actualRecords1).get(0).getData();
final JsonNode state = Jsons.jsonNode(extractStateMessages(actualRecords1));

final AutoCloseableIterator<AirbyteMessage> read2 = getSource()
.read(getConfig(), CONFIGURED_CATALOG, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
Expand Down Expand Up @@ -106,6 +107,18 @@ public abstract class SourceAcceptanceTest extends AbstractSourceConnectorTest {
*/
protected abstract JsonNode getState() throws Exception;

/**
* Tests whether the connector under test supports the per-stream state format or should use the
* legacy format for data generated by this test.
*
* @return {@code true} if the connector supports the per-stream state format or {@code false} if it
* does not support the per-stream state format (e.g. legacy format supported). Default
* value is {@code false}.
*/
protected boolean supportsPerStream() {
return false;
}

/**
* Verify that a spec operation issued to the connector returns a valid spec.
*/
Expand Down Expand Up @@ -236,7 +249,7 @@ public void testIncrementalSyncWithState() throws Exception {

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = stateMessages.get(stateMessages.size() - 1).getData();
final JsonNode latestState = Jsons.jsonNode(supportsPerStream() ? stateMessages : List.of(Iterables.getLast(stateMessages)));
final List<AirbyteRecordMessage> secondSyncRecords = filterRecords(runRead(configuredCatalog, latestState));
assertTrue(
secondSyncRecords.isEmpty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.CdcState;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.JDBCType;
import java.util.List;
import java.util.Set;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -82,6 +88,11 @@ public String getDriverClass() {
return PostgresTestSource.DRIVER_CLASS;
}

@Override
protected boolean supportsPerStream() {
return true;
}

@AfterAll
static void cleanUp() {
PSQL_DB.close();
Expand Down Expand Up @@ -118,6 +129,27 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

// TODO This is a temporary override so that the Postgres source can take advantage of per-stream
// state
@Override
protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode config) {
if (getSupportedStateType(config) == AirbyteStateType.GLOBAL) {
final AirbyteGlobalState globalState = new AirbyteGlobalState()
.withSharedState(Jsons.jsonNode(new CdcState()))
.withStreamStates(List.of());
return List.of(new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState));
} else {
return List.of(new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()));
}
}

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
return AirbyteStateType.STREAM;
}

public static void main(final String[] args) throws Exception {
final Source source = new PostgresTestSource();
LOGGER.info("starting source: {}", PostgresTestSource.class);
Expand Down
Loading