Skip to content

Commit

Permalink
move records to separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jul 17, 2023
1 parent 889704a commit 69cd763
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
package io.airbyte.integrations.base.destination.typing_deduping;

import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -33,30 +29,6 @@ public CatalogParser(final SqlGenerator<?> sqlGenerator, String rawNamespaceOver
this.rawNamespaceOverride = rawNamespaceOverride;
}

public record ParsedCatalog(List<StreamConfig> streams) {

public StreamConfig getStream(String namespace, String name) {
return streams.stream()
.filter(s -> s.id().originalNamespace().equals(namespace) && s.id().originalName().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format(
"Could not find stream %s.%s out of streams %s",
namespace,
name,
streams.stream().map(stream -> stream.id().originalNamespace() + "." + stream.id().originalName()).toList())));
}

}

public record StreamConfig(StreamId id,
SyncMode syncMode,
DestinationSyncMode destinationSyncMode,
List<ColumnId> primaryKey,
Optional<ColumnId> cursor,
LinkedHashMap<ColumnId, AirbyteType> columns) {

}

public ParsedCatalog parseCatalog(ConfiguredAirbyteCatalog catalog) {
// this code is bad and I feel bad
// it's mostly a port of the old normalization logic to prevent tablename collisions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.airbyte.integrations.base.destination.typing_deduping;

/**
* In general, callers should not directly instantiate this class. Use
* {@link SqlGenerator#buildColumnId(String)} instead.
*
* @param name the name of the column in the final table. Callers should prefer
* {@link #name(String)} when using the column in a query.
* @param originalName the name of the field in the raw JSON blob
* @param canonicalName the name of the field according to the destination. Used for deduping.
* Useful if a destination warehouse handles columns ignoring case, but preserves case in the
* table schema.
*/
public record ColumnId(String name, String originalName, String canonicalName) {

public String name(final String quote) {
return quote + name + quote;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.airbyte.integrations.base.destination.typing_deduping;

import java.util.List;

public record ParsedCatalog(List<StreamConfig> streams) {

public StreamConfig getStream(String namespace, String name) {
return streams.stream()
.filter(s -> s.id().originalNamespace().equals(namespace) && s.id().originalName().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format(
"Could not find stream %s.%s out of streams %s",
namespace,
name,
streams.stream().map(stream -> stream.id().originalNamespace() + "." + stream.id().originalName()).toList())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,10 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import java.util.Optional;

public interface SqlGenerator<DialectTableDefinition> {

/**
* In general, callers should not directly instantiate this class. Use
* {@link #buildStreamId(String, String, String)} instead.
* <p>
* All names/namespaces are intended to be quoted, but do not explicitly contain quotes. For
* example, finalName might be "foo bar"; the caller is required to wrap that in quotes before using
* it in a query.
*
* @param finalNamespace the namespace where the final table will be created
* @param finalName the name of the final table
* @param rawNamespace the namespace where the raw table will be created (typically "airbyte")
* @param rawName the name of the raw table (typically namespace_name, but may be different if there
* are collisions). There is no rawNamespace because we assume that we're writing raw tables
* to the airbyte namespace.
*/
record StreamId(String finalNamespace, String finalName, String rawNamespace, String rawName, String originalNamespace, String originalName) {

/**
* Most databases/warehouses use a `schema.name` syntax to identify tables. This is a convenience
* method to generate that syntax.
*/
public String finalTableId(String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + quote;
}

public String finalTableId(String suffix, String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + suffix + quote;
}

public String rawTableId(String quote) {
return quote + rawNamespace + quote + "." + quote + rawName + quote;
}

public String finalName(final String quote) {
return quote + finalName + quote;
}

public String finalNamespace(final String quote) {
return quote + finalNamespace + quote;
}

}

/**
* In general, callers should not directly instantiate this class. Use
* {@link #buildColumnId(String)} instead.
*
* @param name the name of the column in the final table. Callers should prefer
* {@link #name(String)} when using the column in a query.
* @param originalName the name of the field in the raw JSON blob
* @param canonicalName the name of the field according to the destination. Used for deduping.
* Useful if a destination warehouse handles columns ignoring case, but preserves case in the
* table schema.
*/
record ColumnId(String name, String originalName, String canonicalName) {

public String name(final String quote) {
return quote + name + quote;
}

}

StreamId buildStreamId(String namespace, String name, String rawNamespaceOverride);

ColumnId buildColumnId(String name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.airbyte.integrations.base.destination.typing_deduping;

import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;

public record StreamConfig(StreamId id,
SyncMode syncMode,
DestinationSyncMode destinationSyncMode,
List<ColumnId> primaryKey,
Optional<ColumnId> cursor,
LinkedHashMap<ColumnId, AirbyteType> columns) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.airbyte.integrations.base.destination.typing_deduping;

/**
* In general, callers should not directly instantiate this class. Use
* {@link SqlGenerator#buildStreamId(String, String, String)} instead.
* <p>
* All names/namespaces are intended to be quoted, but do not explicitly contain quotes. For
* example, finalName might be "foo bar"; the caller is required to wrap that in quotes before using
* it in a query.
*
* @param finalNamespace the namespace where the final table will be created
* @param finalName the name of the final table
* @param rawNamespace the namespace where the raw table will be created (typically "airbyte")
* @param rawName the name of the raw table (typically namespace_name, but may be different if there
* are collisions). There is no rawNamespace because we assume that we're writing raw tables
* to the airbyte namespace.
*/
public record StreamId(String finalNamespace, String finalName, String rawNamespace, String rawName,
String originalNamespace, String originalName) {

/**
* Most databases/warehouses use a `schema.name` syntax to identify tables. This is a convenience
* method to generate that syntax.
*/
public String finalTableId(String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + quote;
}

public String finalTableId(String suffix, String quote) {
return quote + finalNamespace + quote + "." + quote + finalName + suffix + quote;
}

public String rawTableId(String quote) {
return quote + rawNamespace + quote + "." + quote + rawName + quote;
}

public String finalName(final String quote) {
return quote + finalName + quote;
}

public String finalNamespace(final String quote) {
return quote + finalNamespace + quote;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
Expand Down Expand Up @@ -108,7 +108,7 @@ private CheckedConsumer<AirbyteStreamNameNamespacePair, InterruptedException> ty
final BigQueryDestinationHandler destinationHandler,
final ParsedCatalog parsedCatalog,
final boolean use1s1t,
final Map<SqlGenerator.StreamId, String> overwriteStreamsWithTmpTable) {
final Map<StreamId, String> overwriteStreamsWithTmpTable) {
return (streamId) -> {
if (use1s1t) {
final var streamConfig = parsedCatalog.getStream(streamId.getNamespace(), streamId.getName());
Expand Down Expand Up @@ -196,13 +196,13 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
};
}

private Map<SqlGenerator.StreamId, String> createFinalTables(boolean use1s1t,
final ParsedCatalog parsedCatalog,
final BigQueryDestinationHandler destinationHandler,
final BigQuerySqlGenerator sqlGenerator)
private Map<StreamId, String> createFinalTables(boolean use1s1t,
final ParsedCatalog parsedCatalog,
final BigQueryDestinationHandler destinationHandler,
final BigQuerySqlGenerator sqlGenerator)
throws InterruptedException {
// TODO: share this code from BigQueryRecordConsumer
Map<SqlGenerator.StreamId, String> overwriteStreamsWithTmpTable = new HashMap<>();
Map<StreamId, String> overwriteStreamsWithTmpTable = new HashMap<>();
if (use1s1t) {
// For each stream, make sure that its corresponding final table exists.
for (StreamConfig stream : parsedCatalog.streams()) {
Expand Down Expand Up @@ -306,7 +306,7 @@ private OnCloseFunction onCloseFunction(final BigQueryStagingOperations bigQuery
private CheckedConsumer<BigQueryWriteConfig, InterruptedException> getReplaceFinalTableConsumer(boolean use1s1t,
final BigQuerySqlGenerator sqlGenerator,
final BigQueryDestinationHandler destinationHandler,
final Map<SqlGenerator.StreamId, String> overwriteStreamsWithTmpTable,
final Map<StreamId, String> overwriteStreamsWithTmpTable,
final ParsedCatalog parsedCatalog) {
return (writeConfig) -> {
final var streamConfig = parsedCatalog.getStream(writeConfig.namespace(), writeConfig.streamName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.LinkedHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.BigQuery;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.UploadingMethod;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.RecordDiffer;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.DestinationConfig;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
Expand Down Expand Up @@ -47,7 +46,7 @@ public void setup() {
"test-dataset-id",
mock(BigQuerySqlGenerator.class),
mock(BigQueryDestinationHandler.class),
new CatalogParser.ParsedCatalog(Collections.emptyList()));
new ParsedCatalog(Collections.emptyList()));
}

@Override
Expand Down
Loading

0 comments on commit 69cd763

Please sign in to comment.