From 6f75fd292f30fefc2eccba1e7bb5576e5ce332f1 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Mon, 13 Jun 2022 16:54:17 -0400 Subject: [PATCH] Adjust to protocol changes --- .../jdbc/test/JdbcSourceAcceptanceTest.java | 67 +++++++++---------- .../PostgresJdbcSourceAcceptanceTest.java | 5 +- 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index b4dd8a59465f..2e3a6578a9f5 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -354,12 +354,15 @@ void testDiscoverWithMultipleSchemas() throws Exception { final AirbyteCatalog actual = source.discover(config); final AirbyteCatalog expected = getCatalog(getDefaultNamespace()); - expected.getStreams().add(CatalogHelpers + final List catalogStreams = new ArrayList<>(); + catalogStreams.addAll(expected.getStreams()); + catalogStreams.add(CatalogHelpers .createAirbyteStream(TABLE_NAME, SCHEMA_NAME2, Field.of(COL_ID, JsonSchemaType.STRING), Field.of(COL_NAME, JsonSchemaType.STRING)) .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))); + expected.setStreams(catalogStreams); // sort streams by name so that we are comparing lists with the same order. final Comparator schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName()); expected.getStreams().sort(schemaTableCompare); @@ -661,9 +664,7 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withStreamNamespace(namespace) .withCursorField(List.of(COL_ID)) .withCursor("5"); - expectedMessages.add(new AirbyteMessage() - .withType(Type.STATE) - .withState(Jsons.object(createState(List.of(state)), AirbyteStateMessage.class))); + expectedMessages.addAll(createExpectedTestMessages(List.of(state))); return expectedMessages; } @@ -734,9 +735,9 @@ void testReadMultipleTablesIncrementally() throws Exception { .withCursor("3")); final List expectedMessagesFirstSync = new ArrayList<>(getTestMessages()); - expectedMessagesFirstSync.add(createExpectedTestMessage(expectedStateStreams1)); + expectedMessagesFirstSync.addAll(createExpectedTestMessages(expectedStateStreams1)); expectedMessagesFirstSync.addAll(secondStreamExpectedMessages); - expectedMessagesFirstSync.add(createExpectedTestMessage(expectedStateStreams2)); + expectedMessagesFirstSync.addAll(createExpectedTestMessages(expectedStateStreams2)); setEmittedAtToNull(actualMessagesFirstSync); @@ -803,7 +804,7 @@ private void incrementalCursorCheck( .withCursor(initialCursorValue); final List actualMessages = MoreIterators - .toList(source.read(config, configuredCatalog, createState(List.of(dbStreamState)))); + .toList(source.read(config, configuredCatalog, Jsons.jsonNode(createState(List.of(dbStreamState))))); setEmittedAtToNull(actualMessages); @@ -814,7 +815,7 @@ private void incrementalCursorCheck( .withCursorField(List.of(cursorField)) .withCursor(endCursorValue)); final List expectedMessages = new ArrayList<>(expectedRecordMessages); - expectedMessages.add(createExpectedTestMessage(expectedStreams)); + expectedMessages.addAll(createExpectedTestMessages(expectedStreams)); assertEquals(actualMessages.size(), expectedMessages.size()); assertEquals(actualMessages, expectedMessages); @@ -883,10 +884,30 @@ protected List getTestMessages() { COL_UPDATED_AT, "2006-10-19T00:00:00Z"))))); } - protected AirbyteMessage createExpectedTestMessage(final List states) { - return new AirbyteMessage() - .withType(Type.STATE) - .withState(Jsons.object(createState(states), AirbyteStateMessage.class)); + protected List createExpectedTestMessages(final List states) { + return supportsPerStream() + ? states.stream() + .map(s -> new AirbyteMessage().withType(Type.STATE) + .withState(new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName())) + .withStreamState(Jsons.jsonNode(s))))) + .collect( + Collectors.toList()) + : List.of(new AirbyteMessage().withType(Type.STATE).withState(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY) + .withData(Jsons.jsonNode(new DbState().withCdc(false).withStreams(states))))); + } + + protected List createState(final List states) { + return supportsPerStream() + ? states.stream() + .map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName())) + .withStreamState(Jsons.jsonNode(s)))) + .collect( + Collectors.toList()) + : List.of(new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(new DbState().withStreams(states)))); } protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException { @@ -1011,28 +1032,6 @@ protected JsonNode createEmptyState(final String streamName, final String stream } } - /** - * Creates state with the provided stream(s). - * - * @param streams A list of streams. - * @return A {@link JsonNode} representation of the state with the provided stream state. - */ - protected JsonNode createState(final List streams) { - if (supportsPerStream()) { - final List messages = streams.stream() - .map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM) - .withStream(new AirbyteStreamState() - .withStreamDescriptor(new StreamDescriptor().withName(s.getStreamName()).withNamespace(s.getStreamNamespace())) - .withStreamState(Jsons.jsonNode(s)))) - .collect(Collectors.toList()); - return Jsons.jsonNode(messages); - } else { - final DbState dbState = new DbState() - .withStreams(streams.stream().collect(Collectors.toList())); - return Jsons.jsonNode(dbState); - } - } - /** * Extracts the state component from the provided {@link AirbyteMessage} based on the value returned * by {@link #supportsPerStream()}. diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 81f18cca890b..1dba9e85dc3c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -26,7 +26,6 @@ import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; -import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.ConnectorSpecification; @@ -435,9 +434,7 @@ protected List getExpectedAirbyteMessagesSecondSync(final String .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) .withCursor("5"); - expectedMessages.add(new AirbyteMessage() - .withType(AirbyteMessage.Type.STATE) - .withState(Jsons.object(createState(List.of(state)), AirbyteStateMessage.class))); + expectedMessages.addAll(createExpectedTestMessages(List.of(state))); return expectedMessages; }