Skip to content

Commit

Permalink
šŸ› Server can display streams with the same name in different namespacā€¦
Browse files Browse the repository at this point in the history
ā€¦es (#13855)
  • Loading branch information
edgao authored Jun 16, 2022
1 parent 1d99b58 commit bf52c2a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,17 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti

@VisibleForTesting
protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog original, final AirbyteCatalog discovered) {
final Map<String, AirbyteStreamAndConfiguration> originalStreamsByName = original.getStreams()
// We can't directly use s.getStream() as the key, because it contains a bunch of other fields
// so instead we just define a quick-and-dirty record class.
final Map<Stream, AirbyteStreamAndConfiguration> originalStreamsByName = original.getStreams()
.stream()
.collect(toMap(s -> s.getStream().getName(), s -> s));
.collect(toMap(s -> new Stream(s.getStream().getName(), s.getStream().getNamespace()), s -> s));

final List<AirbyteStreamAndConfiguration> streams = new ArrayList<>();

for (final AirbyteStreamAndConfiguration s : discovered.getStreams()) {
final AirbyteStream stream = s.getStream();
final AirbyteStreamAndConfiguration originalStream = originalStreamsByName.get(stream.getName());
final AirbyteStreamAndConfiguration originalStream = originalStreamsByName.get(new Stream(stream.getName(), stream.getNamespace()));
final AirbyteStreamConfiguration outputStreamConfig;

if (originalStream != null) {
Expand Down Expand Up @@ -326,11 +328,6 @@ private List<UUID> updateOperations(final WebBackendConnectionUpdate webBackendC
return operationIds;
}

private UUID getWorkspaceIdForConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException {
final UUID sourceId = connectionsHandler.getConnection(connectionId).getSourceId();
return getWorkspaceIdForSource(sourceId);
}

private UUID getWorkspaceIdForSource(final UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException {
return sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId)).getWorkspaceId();
}
Expand Down Expand Up @@ -412,4 +409,13 @@ protected static ConnectionSearch toConnectionSearch(final WebBackendConnectionS
.status(webBackendConnectionSearch.getStatus());
}

/**
* Equivalent to {@see io.airbyte.integrations.base.AirbyteStreamNameNamespacePair}. Intentionally
* not using that class because it doesn't make sense for airbyte-server to depend on
* base-java-integration.
*/
private record Stream(String name, String namespace) {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.Lists;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.AirbyteStream;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.generated.AttemptRead;
import io.airbyte.api.model.generated.AttemptStatus;
Expand Down Expand Up @@ -773,4 +774,52 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() {
assertEquals(expected, actual);
}

@Test
public void testUpdateSchemaWithNamespacedStreams() {
final AirbyteCatalog original = ConnectionHelpers.generateBasicApiCatalog();
final AirbyteStreamAndConfiguration stream1Config = original.getStreams().get(0);
final AirbyteStream stream1 = stream1Config.getStream();
final AirbyteStream stream2 = new AirbyteStream()
.name(stream1.getName())
.namespace("second_namespace")
.jsonSchema(stream1.getJsonSchema())
.defaultCursorField(stream1.getDefaultCursorField())
.supportedSyncModes(stream1.getSupportedSyncModes())
.sourceDefinedCursor(stream1.getSourceDefinedCursor())
.sourceDefinedPrimaryKey(stream1.getSourceDefinedPrimaryKey());
final AirbyteStreamAndConfiguration stream2Config = new AirbyteStreamAndConfiguration()
.config(stream1Config.getConfig())
.stream(stream2);
original.getStreams().add(stream2Config);

final AirbyteCatalog discovered = ConnectionHelpers.generateBasicApiCatalog();
discovered.getStreams().get(0).getStream()
.name("stream1")
.jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of("field1", JsonSchemaType.STRING)))
.supportedSyncModes(List.of(SyncMode.FULL_REFRESH));
discovered.getStreams().get(0).getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");

final AirbyteCatalog expected = ConnectionHelpers.generateBasicApiCatalog();
expected.getStreams().get(0).getStream()
.name("stream1")
.jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of("field1", JsonSchemaType.STRING)))
.supportedSyncModes(List.of(SyncMode.FULL_REFRESH));
expected.getStreams().get(0).getConfig()
.syncMode(SyncMode.FULL_REFRESH)
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1")
.setSelected(false);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);

assertEquals(expected, actual);
}

}

0 comments on commit bf52c2a

Please sign in to comment.