Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Nov 19, 2024
1 parent 3065076 commit a097744
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 24 deletions.
11 changes: 7 additions & 4 deletions clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type Store struct {

func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, useTempTable bool) error {
if !useTempTable {
return shared.Append(ctx, s, tableData, types.AdditionalSettings{})
return shared.Append(ctx, s, tableData, types.AdditionalSettings{
ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings,
})
}

// We can simplify this once Google has fully rolled out the ability to execute DML on recently streamed data
Expand All @@ -55,8 +57,9 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u
defer func() { _ = ddl.DropTemporaryTable(s, temporaryTableID, false) }()

err := shared.Append(ctx, s, tableData, types.AdditionalSettings{
UseTempTable: true,
TempTableID: temporaryTableID,
ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings,
UseTempTable: true,
TempTableID: temporaryTableID,
})

if err != nil {
Expand All @@ -79,7 +82,7 @@ func (s *Store) Append(ctx context.Context, tableData *optimization.TableData, u

func (s *Store) PrepareTemporaryTable(ctx context.Context, tableData *optimization.TableData, dwh *types.DwhTableConfig, tempTableID sql.TableIdentifier, _ sql.TableIdentifier, _ types.AdditionalSettings, createTempTable bool) error {
if createTempTable {
if err := shared.CreateTable(ctx, s, tableData, dwh, tempTableID, true); err != nil {
if err := shared.CreateTable(ctx, s, tableData, dwh, s.config.SharedDestinationSettings.ColumnSettings, tempTableID, true); err != nil {
return err
}
}
Expand Down
6 changes: 3 additions & 3 deletions clients/bigquery/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string {
func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, settings config.SharedDestinationColumnSettings) string {
// Doesn't look like we need to do any special type mapping.
switch kindDetails.Kind {
case typing.Float.Kind:
Expand All @@ -33,8 +34,7 @@ func (BigQueryDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) s
// We should be using TIMESTAMP since it's an absolute point in time.
return "timestamp"
case typing.EDecimal.Kind:
// TODO: Pass in
return kindDetails.ExtendedDecimalDetails.BigQueryKind(false)
return kindDetails.ExtendedDecimalDetails.BigQueryKind(settings.BigQueryNumericForVariableNumeric)
}

return kindDetails.Kind
Expand Down
1 change: 1 addition & 0 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

return shared.Merge(ctx, s, tableData, types.MergeOpts{
AdditionalEqualityStrings: additionalEqualityStrings,
ColumnSettings: s.config.SharedDestinationSettings.ColumnSettings,
// BigQuery has DDL quotas.
RetryColBackfill: true,
// We are using BigQuery's streaming API which doesn't guarantee exactly once semantics
Expand Down
3 changes: 2 additions & 1 deletion clients/databricks/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string {
func (DatabricksDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string {
switch kindDetails.Kind {
case typing.Float.Kind:
return "DOUBLE"
Expand Down
4 changes: 3 additions & 1 deletion clients/mssql/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"strconv"
"strings"

"github.com/artie-labs/transfer/lib/config"

"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool) string {
func (MSSQLDialect) DataTypeForKind(kindDetails typing.KindDetails, isPk bool, _ config.SharedDestinationColumnSettings) string {
// Primary keys cannot exceed 900 chars in length.
// https://learn.microsoft.com/en-us/sql/relational-databases/tables/primary-and-foreign-key-constraints?view=sql-server-ver16#PKeys
const maxVarCharLengthForPrimaryKey = 900
Expand Down
4 changes: 3 additions & 1 deletion clients/redshift/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"strconv"
"strings"

"github.com/artie-labs/transfer/lib/config"

"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool) string {
func (RedshiftDialect) DataTypeForKind(kd typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string {
switch kd.Kind {
case typing.Integer.Kind:
if kd.OptionalIntegerKind != nil {
Expand Down
4 changes: 2 additions & 2 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
if tableConfig.CreateTable() {
if err = CreateTable(ctx, dwh, tableData, tableConfig, tableID, false); err != nil {
if err = CreateTable(ctx, dwh, tableData, tableConfig, opts.ColumnSettings, tableID, false); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
} else {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, tableID, targetKeysMissing); err != nil {
if err = AlterTableAddColumns(ctx, dwh, tableConfig, opts.ColumnSettings, tableID, targetKeysMissing); err != nil {
return fmt.Errorf("failed to alter table: %w", err)
}
}
Expand Down
10 changes: 6 additions & 4 deletions clients/shared/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"log/slog"
"time"

"github.com/artie-labs/transfer/lib/config"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/ddl"
Expand All @@ -15,8 +17,8 @@ import (
"github.com/artie-labs/transfer/lib/typing/columns"
)

func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, tableID sql.TableIdentifier, tempTable bool) error {
query, err := ddl.BuildCreateTableSQL(dwh.Dialect(), tableID, tempTable, tableData.Mode(), tableData.ReadOnlyInMemoryCols().GetColumns())
func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *optimization.TableData, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, tempTable bool) error {
query, err := ddl.BuildCreateTableSQL(settings, dwh.Dialect(), tableID, tempTable, tableData.Mode(), tableData.ReadOnlyInMemoryCols().GetColumns())
if err != nil {
return fmt.Errorf("failed to build create table sql: %w", err)
}
Expand All @@ -31,7 +33,7 @@ func CreateTable(ctx context.Context, dwh destination.DataWarehouse, tableData *
return nil
}

func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, tableID sql.TableIdentifier, cols []columns.Column) error {
func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc *types.DwhTableConfig, settings config.SharedDestinationColumnSettings, tableID sql.TableIdentifier, cols []columns.Column) error {
if len(cols) == 0 {
return nil
}
Expand All @@ -45,7 +47,7 @@ func AlterTableAddColumns(ctx context.Context, dwh destination.DataWarehouse, tc
colsToAdd = append(colsToAdd, col)
}

sqlParts, err := ddl.BuildAlterTableAddColumns(dwh.Dialect(), tableID, colsToAdd)
sqlParts, err := ddl.BuildAlterTableAddColumns(settings, dwh.Dialect(), tableID, colsToAdd)
if err != nil {
return fmt.Errorf("failed to build alter table add columns: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion clients/snowflake/dialect/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"strconv"
"strings"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
"github.com/artie-labs/transfer/lib/typing"
)

func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool) string {
func (SnowflakeDialect) DataTypeForKind(kindDetails typing.KindDetails, _ bool, _ config.SharedDestinationColumnSettings) string {
switch kindDetails.Kind {
case typing.Struct.Kind:
// Snowflake doesn't recognize struct.
Expand Down
9 changes: 8 additions & 1 deletion lib/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Pubsub struct {
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`
PathToCredentials string `yaml:"pathToCredentials"`
}

type Kafka struct {
// Comma-separated Kafka servers to port.
// e.g. host1:port1,host2:port2,...
Expand All @@ -36,12 +37,18 @@ type Kafka struct {
DisableTLS bool `yaml:"disableTLS,omitempty"`
}

type SharedDestinationColumnSettings struct {
// BigQueryNumericForVariableNumeric - If enabled, we will use BigQuery's NUMERIC type for variable numeric types.
BigQueryNumericForVariableNumeric bool `yaml:"bigQueryNumericForVariableNumeric"`
}

type SharedDestinationSettings struct {
// TruncateExceededValues - This will truncate exceeded values instead of replacing it with `__artie_exceeded_value`
TruncateExceededValues bool `yaml:"truncateExceededValues"`
// ExpandStringPrecision - This will expand the string precision if the incoming data has a higher precision than the destination table.
// This is only supported by Redshift at the moment.
ExpandStringPrecision bool `yaml:"expandStringPrecision"`
ExpandStringPrecision bool `yaml:"expandStringPrecision"`
ColumnSettings SharedDestinationColumnSettings `yaml:"columnSettings"`
}

type Reporting struct {
Expand Down
8 changes: 4 additions & 4 deletions lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func shouldCreatePrimaryKey(col columns.Column, mode config.Mode, createTable bo
return col.PrimaryKey() && mode == config.Replication && createTable
}

func BuildCreateTableSQL(dialect sql.Dialect, tableIdentifier sql.TableIdentifier, temporaryTable bool, mode config.Mode, columns []columns.Column) (string, error) {
func BuildCreateTableSQL(settings config.SharedDestinationColumnSettings, dialect sql.Dialect, tableIdentifier sql.TableIdentifier, temporaryTable bool, mode config.Mode, columns []columns.Column) (string, error) {
if len(columns) == 0 {
return "", fmt.Errorf("no columns provided")
}
Expand All @@ -34,7 +34,7 @@ func BuildCreateTableSQL(dialect sql.Dialect, tableIdentifier sql.TableIdentifie
primaryKeys = append(primaryKeys, colName)
}

parts = append(parts, fmt.Sprintf("%s %s", colName, dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey())))
parts = append(parts, fmt.Sprintf("%s %s", colName, dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey(), settings)))
}

if len(primaryKeys) > 0 {
Expand Down Expand Up @@ -71,14 +71,14 @@ func DropTemporaryTable(dwh destination.DataWarehouse, tableIdentifier sql.Table
return nil
}

func BuildAlterTableAddColumns(dialect sql.Dialect, tableID sql.TableIdentifier, cols []columns.Column) ([]string, error) {
func BuildAlterTableAddColumns(settings config.SharedDestinationColumnSettings, dialect sql.Dialect, tableID sql.TableIdentifier, cols []columns.Column) ([]string, error) {
var parts []string
for _, col := range cols {
if col.ShouldSkip() {
return nil, fmt.Errorf("received an invalid column %q", col.Name())
}

sqlPart := fmt.Sprintf("%s %s", dialect.QuoteIdentifier(col.Name()), dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey()))
sqlPart := fmt.Sprintf("%s %s", dialect.QuoteIdentifier(col.Name()), dialect.DataTypeForKind(col.KindDetails, col.PrimaryKey(), settings))
parts = append(parts, dialect.BuildAlterColumnQuery(tableID, constants.Add, sqlPart))
}

Expand Down
5 changes: 4 additions & 1 deletion lib/destination/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"sync"

"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/sql"
)

Expand Down Expand Up @@ -35,13 +36,15 @@ func (d *DwhToTablesConfigMap) AddTableToConfig(tableID sql.TableIdentifier, con
}

type MergeOpts struct {
SubQueryDedupe bool
AdditionalEqualityStrings []string
ColumnSettings config.SharedDestinationColumnSettings
RetryColBackfill bool
SubQueryDedupe bool
}

type AdditionalSettings struct {
AdditionalCopyClause string
ColumnSettings config.SharedDestinationColumnSettings

// These settings are used for the `Append` method.
UseTempTable bool
Expand Down
3 changes: 2 additions & 1 deletion lib/sql/dialect.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sql

import (
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
Expand All @@ -25,7 +26,7 @@ type TableIdentifier interface {
type Dialect interface {
QuoteIdentifier(identifier string) string
EscapeStruct(value string) string
DataTypeForKind(kd typing.KindDetails, isPk bool) string
DataTypeForKind(kd typing.KindDetails, isPk bool, settings config.SharedDestinationColumnSettings) string
KindForDataType(_type string, stringPrecision string) (typing.KindDetails, error)
IsColumnAlreadyExistsErr(err error) bool
IsTableDoesNotExistErr(err error) bool
Expand Down

0 comments on commit a097744

Please sign in to comment.