From bf52c2afbedd569c3afbe0b2fcbb868863fc2e19 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 16 Jun 2022 13:37:05 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Server=20can=20display=20streams?= =?UTF-8?q?=20with=20the=20same=20name=20in=20different=20namespaces=20(#1?= =?UTF-8?q?3855)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WebBackendConnectionsHandler.java | 22 ++++++--- .../WebBackendConnectionsHandlerTest.java | 49 +++++++++++++++++++ 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index c97b717e17fb..c947bde6997b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -220,15 +220,17 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti @VisibleForTesting protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog original, final AirbyteCatalog discovered) { - final Map originalStreamsByName = original.getStreams() + // We can't directly use s.getStream() as the key, because it contains a bunch of other fields + // so instead we just define a quick-and-dirty record class. + final Map originalStreamsByName = original.getStreams() .stream() - .collect(toMap(s -> s.getStream().getName(), s -> s)); + .collect(toMap(s -> new Stream(s.getStream().getName(), s.getStream().getNamespace()), s -> s)); final List streams = new ArrayList<>(); for (final AirbyteStreamAndConfiguration s : discovered.getStreams()) { final AirbyteStream stream = s.getStream(); - final AirbyteStreamAndConfiguration originalStream = originalStreamsByName.get(stream.getName()); + final AirbyteStreamAndConfiguration originalStream = originalStreamsByName.get(new Stream(stream.getName(), stream.getNamespace())); final AirbyteStreamConfiguration outputStreamConfig; if (originalStream != null) { @@ -326,11 +328,6 @@ private List updateOperations(final WebBackendConnectionUpdate webBackendC return operationIds; } - private UUID getWorkspaceIdForConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException { - final UUID sourceId = connectionsHandler.getConnection(connectionId).getSourceId(); - return getWorkspaceIdForSource(sourceId); - } - private UUID getWorkspaceIdForSource(final UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { return sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId)).getWorkspaceId(); } @@ -412,4 +409,13 @@ protected static ConnectionSearch toConnectionSearch(final WebBackendConnectionS .status(webBackendConnectionSearch.getStatus()); } + /** + * Equivalent to {@see io.airbyte.integrations.base.AirbyteStreamNameNamespacePair}. Intentionally + * not using that class because it doesn't make sense for airbyte-server to depend on + * base-java-integration. + */ + private record Stream(String name, String namespace) { + + } + } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index da293306d38e..17d3b167691a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -17,6 +17,7 @@ import com.google.common.collect.Lists; import io.airbyte.api.model.generated.AirbyteCatalog; +import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; import io.airbyte.api.model.generated.AttemptRead; import io.airbyte.api.model.generated.AttemptStatus; @@ -773,4 +774,52 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() { assertEquals(expected, actual); } + @Test + public void testUpdateSchemaWithNamespacedStreams() { + final AirbyteCatalog original = ConnectionHelpers.generateBasicApiCatalog(); + final AirbyteStreamAndConfiguration stream1Config = original.getStreams().get(0); + final AirbyteStream stream1 = stream1Config.getStream(); + final AirbyteStream stream2 = new AirbyteStream() + .name(stream1.getName()) + .namespace("second_namespace") + .jsonSchema(stream1.getJsonSchema()) + .defaultCursorField(stream1.getDefaultCursorField()) + .supportedSyncModes(stream1.getSupportedSyncModes()) + .sourceDefinedCursor(stream1.getSourceDefinedCursor()) + .sourceDefinedPrimaryKey(stream1.getSourceDefinedPrimaryKey()); + final AirbyteStreamAndConfiguration stream2Config = new AirbyteStreamAndConfiguration() + .config(stream1Config.getConfig()) + .stream(stream2); + original.getStreams().add(stream2Config); + + final AirbyteCatalog discovered = ConnectionHelpers.generateBasicApiCatalog(); + discovered.getStreams().get(0).getStream() + .name("stream1") + .jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of("field1", JsonSchemaType.STRING))) + .supportedSyncModes(List.of(SyncMode.FULL_REFRESH)); + discovered.getStreams().get(0).getConfig() + .syncMode(SyncMode.FULL_REFRESH) + .cursorField(Collections.emptyList()) + .destinationSyncMode(DestinationSyncMode.OVERWRITE) + .primaryKey(Collections.emptyList()) + .aliasName("stream1"); + + final AirbyteCatalog expected = ConnectionHelpers.generateBasicApiCatalog(); + expected.getStreams().get(0).getStream() + .name("stream1") + .jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of("field1", JsonSchemaType.STRING))) + .supportedSyncModes(List.of(SyncMode.FULL_REFRESH)); + expected.getStreams().get(0).getConfig() + .syncMode(SyncMode.FULL_REFRESH) + .cursorField(Collections.emptyList()) + .destinationSyncMode(DestinationSyncMode.OVERWRITE) + .primaryKey(Collections.emptyList()) + .aliasName("stream1") + .setSelected(false); + + final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered); + + assertEquals(expected, actual); + } + }