Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Bigquery: 1s1t: handle raw name collisions #28366

Merged
merged 4 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
package io.airbyte.integrations.base.destination.typing_deduping;

import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -33,30 +29,6 @@ public CatalogParser(final SqlGenerator<?> sqlGenerator, String rawNamespaceOver
this.rawNamespaceOverride = rawNamespaceOverride;
}

public record ParsedCatalog(List<StreamConfig> streams) {

public StreamConfig getStream(String namespace, String name) {
return streams.stream()
.filter(s -> s.id().originalNamespace().equals(namespace) && s.id().originalName().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format(
"Could not find stream %s.%s out of streams %s",
namespace,
name,
streams.stream().map(stream -> stream.id().originalNamespace() + "." + stream.id().originalName()).toList())));
}

}

public record StreamConfig(StreamId id,
SyncMode syncMode,
DestinationSyncMode destinationSyncMode,
List<ColumnId> primaryKey,
Optional<ColumnId> cursor,
LinkedHashMap<ColumnId, AirbyteType> columns) {

}

public ParsedCatalog parseCatalog(ConfiguredAirbyteCatalog catalog) {
// this code is bad and I feel bad
// it's mostly a port of the old normalization logic to prevent tablename collisions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

/**
* In general, callers should not directly instantiate this class. Use
* {@link SqlGenerator#buildColumnId(String)} instead.
*
* @param name the name of the column in the final table. Callers should prefer
* {@link #name(String)} when using the column in a query.
* @param originalName the name of the field in the raw JSON blob
* @param canonicalName the name of the field according to the destination. Used for deduping.
* Useful if a destination warehouse handles columns ignoring case, but preserves case in the
* table schema.
*/
public record ColumnId(String name, String originalName, String canonicalName) {

public String name(final String quote) {
return quote + name + quote;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

import java.util.List;

public record ParsedCatalog(List<StreamConfig> streams) {

public StreamConfig getStream(String namespace, String name) {
return streams.stream()
.filter(s -> s.id().originalNamespace().equals(namespace) && s.id().originalName().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format(
"Could not find stream %s.%s out of streams %s",
namespace,
name,
streams.stream().map(stream -> stream.id().originalNamespace() + "." + stream.id().originalName()).toList())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,10 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import java.util.Optional;

public interface SqlGenerator<DialectTableDefinition> {

/**
* In general, callers should not directly instantiate this class. Use
* {@link #buildStreamId(String, String, String)} instead.
* <p>
* All names/namespaces are intended to be quoted, but do not explicitly contain quotes. For
* example, finalName might be "foo bar"; the caller is required to wrap that in quotes before using
* it in a query.
*
* @param finalNamespace the namespace where the final table will be created
* @param finalName the name of the final table
* @param rawNamespace the namespace where the raw table will be created (typically "airbyte")
* @param rawName the name of the raw table (typically namespace_name, but may be different if there
* are collisions). There is no rawNamespace because we assume that we're writing raw tables
* to the airbyte namespace.
*/
record StreamId(String finalNamespace, String finalName, String rawNamespace, String rawName, String originalNamespace, String originalName) {

/**
* Most databases/warehouses use a `schema.name` syntax to identify tables. This is a convenience
* method to generate that syntax.
*/
public String finalTableId(String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + quote;
}

public String finalTableId(String suffix, String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + suffix + quote;
}

public String rawTableId(String quote) {
return quote + rawNamespace + quote + "." + quote + rawName + quote;
}

public String finalName(final String quote) {
return quote + finalName + quote;
}

public String finalNamespace(final String quote) {
return quote + finalNamespace + quote;
}

}

/**
* In general, callers should not directly instantiate this class. Use
* {@link #buildColumnId(String)} instead.
*
* @param name the name of the column in the final table. Callers should prefer
* {@link #name(String)} when using the column in a query.
* @param originalName the name of the field in the raw JSON blob
* @param canonicalName the name of the field according to the destination. Used for deduping.
* Useful if a destination warehouse handles columns ignoring case, but preserves case in the
* table schema.
*/
record ColumnId(String name, String originalName, String canonicalName) {

public String name(final String quote) {
return quote + name + quote;
}

}

StreamId buildStreamId(String namespace, String name, String rawNamespaceOverride);

ColumnId buildColumnId(String name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;

public record StreamConfig(StreamId id,
SyncMode syncMode,
DestinationSyncMode destinationSyncMode,
List<ColumnId> primaryKey,
Optional<ColumnId> cursor,
LinkedHashMap<ColumnId, AirbyteType> columns) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

/**
* In general, callers should not directly instantiate this class. Use
* {@link SqlGenerator#buildStreamId(String, String, String)} instead.
* <p>
* All names/namespaces are intended to be quoted, but do not explicitly contain quotes. For
* example, finalName might be "foo bar"; the caller is required to wrap that in quotes before using
* it in a query.
*
* @param finalNamespace the namespace where the final table will be created
* @param finalName the name of the final table
* @param rawNamespace the namespace where the raw table will be created (typically "airbyte")
* @param rawName the name of the raw table (typically namespace_name, but may be different if there
* are collisions). There is no rawNamespace because we assume that we're writing raw tables
* to the airbyte namespace.
*/
public record StreamId(String finalNamespace,
String finalName,
String rawNamespace,
String rawName,
String originalNamespace,
String originalName) {

/**
* Most databases/warehouses use a `schema.name` syntax to identify tables. This is a convenience
* method to generate that syntax.
*/
public String finalTableId(String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + quote;
}

public String finalTableId(String suffix, String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + suffix + quote;
}

public String rawTableId(String quote) {
return quote + rawNamespace + quote + "." + quote + rawName + quote;
}

public String finalName(final String quote) {
return quote + finalName + quote;
}

public String finalNamespace(final String quote) {
return quote + finalNamespace + quote;
}

/**
* Build the raw table name as namespace + (delimiter) + name. For example, given a stream with
* namespace "public__ab" and name "abab_users", we will end up with raw table name
* "public__ab_ab___ab_abab_users".
* <p>
* This logic is intended to solve two problems:
* <ul>
* <li>The raw table name should be unambiguously parsable into the namespace/name.</li>
* <li>It must be impossible for two different streams to generate the same raw table name.</li>
* </ul>
* The generated delimiter is guaranteed to not be present in the namespace or name, so it
* accomplishes both of these goals.
*/
public static String concatenateRawTableName(String namespace, String name) {
String plainConcat = namespace + name;
int longestUnderscoreRun = 0;
for (int i = 0; i < plainConcat.length(); i++) {
// If we've found an underscore, count the number of consecutive underscores
int underscoreRun = 0;
while (i < plainConcat.length() && plainConcat.charAt(i) == '_') {
underscoreRun++;
i++;
}
longestUnderscoreRun = Math.max(longestUnderscoreRun, underscoreRun);
}

return namespace + "_ab" + "_".repeat(longestUnderscoreRun + 1) + "ab_" + name;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,17 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

class CatalogParserTest {
Expand All @@ -39,30 +34,12 @@ public void setup() {
String namespace = invocation.getArgument(0);
String name = invocation.getArgument(1);
String rawNamespace = invocation.getArgument(1);
return new StreamId(namespace, name, rawNamespace, namespace + "_" + name, namespace, name);
return new StreamId(namespace, name, rawNamespace, namespace + "_abab_" + name, namespace, name);
});

parser = new CatalogParser(sqlGenerator);
}

/**
* Both these streams want the same raw table name ("a_b_c"). Verify that they don't actually use
* the same raw table.
*/
@Disabled("This feature is not yet supported; see https://github.com/airbytehq/airbyte/issues/27798")
@Test
public void rawNameCollision() {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
stream("a", "b_c"),
stream("a_b", "c")));

final ParsedCatalog parsedCatalog = parser.parseCatalog(catalog);

assertNotEquals(
parsedCatalog.streams().get(0).id().rawName(),
parsedCatalog.streams().get(1).id().rawName());
}

/**
* Both these streams will write to the same final table name ("foofoo"). Verify that they don't
* actually use the same tablename.
Expand All @@ -76,7 +53,8 @@ public void finalNameCollision() {

// emulate quoting logic that causes a name collision
String quotedName = originalName.replaceAll("bar", "");
return new StreamId(originalNamespace, quotedName, originalRawNamespace, originalNamespace + "_" + quotedName, originalNamespace, originalName);
return new StreamId(originalNamespace, quotedName, originalRawNamespace, originalNamespace + "_abab_" + quotedName, originalNamespace,
originalName);
});
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
stream("a", "foobarfoo"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

class StreamIdTest {

/**
* Both these streams naively want the same raw table name ("aaa_abab_bbb_abab_ccc"). Verify that
* they don't actually use the same raw table.
*/
@Test
public void rawNameCollision() {
String stream1 = StreamId.concatenateRawTableName("aaa_abab_bbb", "ccc");
String stream2 = StreamId.concatenateRawTableName("aaa", "bbb_abab_ccc");

assertEquals("aaa_abab_bbb_ab__ab_ccc", stream1);
assertEquals("aaa_ab__ab_bbb_abab_ccc", stream2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
Expand Down
Loading
Loading