From e98ae1a54676d41d95941199ccc134bba3f2d4e9 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 18 Dec 2024 11:32:29 -0800 Subject: [PATCH] [MysQL] Moving filter columns logic (#608) --- lib/debezium/transformer/light_transformer.go | 4 +- sources/mysql/streaming/ddl/ddl_test.go | 38 +++++++++++++++++++ sources/mysql/streaming/ddl/table_adapter.go | 34 +++++++++-------- sources/mysql/streaming/dml.go | 2 +- 4 files changed, 59 insertions(+), 19 deletions(-) diff --git a/lib/debezium/transformer/light_transformer.go b/lib/debezium/transformer/light_transformer.go index 7f1124ba..5a1b72db 100644 --- a/lib/debezium/transformer/light_transformer.go +++ b/lib/debezium/transformer/light_transformer.go @@ -2,8 +2,6 @@ package transformer import ( "fmt" - "time" - "github.com/artie-labs/transfer/lib/cdc/util" "github.com/artie-labs/transfer/lib/debezium" @@ -47,7 +45,7 @@ func (l LightDebeziumTransformer) BuildPartitionKey(beforeRow, afterRow Row) (de return convertPartitionKey(l.valueConverters, l.partitionKeys, row) } -func (l LightDebeziumTransformer) BuildEventPayload(source util.Source, beforeRow Row, afterRow Row, op string, ts time.Time) (util.SchemaEventPayload, error) { +func (l LightDebeziumTransformer) BuildEventPayload(source util.Source, beforeRow Row, afterRow Row, op string) (util.SchemaEventPayload, error) { schema := debezium.Schema{FieldsObject: []debezium.FieldsObject{}} payload := util.Payload{ Source: source, diff --git a/sources/mysql/streaming/ddl/ddl_test.go b/sources/mysql/streaming/ddl/ddl_test.go index 7f9e7fa2..e1616583 100644 --- a/sources/mysql/streaming/ddl/ddl_test.go +++ b/sources/mysql/streaming/ddl/ddl_test.go @@ -58,6 +58,44 @@ func TestSchemaAdapter_SQLMode(t *testing.T) { } } +func TestSchemaAdapter_ColumnFiltering(t *testing.T) { + { + // Excluding column [exclude_me] + adapter := NewSchemaAdapter(config.MySQL{Database: "foo", Tables: []*config.MySQLTable{{Name: "test_table", ExcludeColumns: []string{"exclude_me"}}}}, nil) + assert.Equal(t, "foo", adapter.dbName) + + assert.NoError(t, adapter.ApplyDDL(99, "CREATE TABLE test_table (id INT PRIMARY KEY, exclude_me VARCHAR(255));")) + assert.Len(t, adapter.adapters, 1) + + tblAdapter, ok := adapter.GetTableAdapter("test_table") + assert.True(t, ok) + + assert.Len(t, tblAdapter.columns, 2) + + parsedCols, err := tblAdapter.buildParsedColumns() + assert.NoError(t, err) + assert.Len(t, parsedCols, 1) + assert.Equal(t, "id", parsedCols[0].Name) + } + { + // Not excluding + adapter := NewSchemaAdapter(config.MySQL{Database: "foo", Tables: []*config.MySQLTable{{Name: "test_table"}}}, nil) + assert.Equal(t, "foo", adapter.dbName) + + assert.NoError(t, adapter.ApplyDDL(99, "CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(255));")) + assert.Len(t, adapter.adapters, 1) + + tblAdapter, ok := adapter.GetTableAdapter("test_table") + assert.True(t, ok) + + assert.Len(t, tblAdapter.columns, 2) + + parsedCols, err := tblAdapter.buildParsedColumns() + assert.NoError(t, err) + assert.Len(t, parsedCols, 2) + } +} + func TestSchemaAdapter_ApplyDDL(t *testing.T) { { // Column rename diff --git a/sources/mysql/streaming/ddl/table_adapter.go b/sources/mysql/streaming/ddl/table_adapter.go index 2a25a4a2..f0795379 100644 --- a/sources/mysql/streaming/ddl/table_adapter.go +++ b/sources/mysql/streaming/ddl/table_adapter.go @@ -68,20 +68,8 @@ func (t TableAdapter) buildFieldConverters() ([]transformer.FieldConverter, erro return nil, nil } - // Exclude columns (if any) from the table metadata - cols, err := column.FilterOutExcludedColumns(t.GetParsedColumns(), t.tableCfg.ExcludeColumns, t.PartitionKeys()) - if err != nil { - return nil, err - } - - // Include columns (if any) from the table metadata - cols, err = column.FilterForIncludedColumns(cols, t.tableCfg.IncludeColumns, t.PartitionKeys()) - if err != nil { - return nil, err - } - - fieldConverters := make([]transformer.FieldConverter, len(cols)) - for i, col := range cols { + fieldConverters := make([]transformer.FieldConverter, len(t.parsedColumns)) + for i, col := range t.parsedColumns { converter, err := converters.ValueConverterForType(col.Type, col.Opts) if err != nil { return nil, fmt.Errorf("failed to build value converter for column %q: %w", col.Name, err) @@ -107,7 +95,23 @@ func (t TableAdapter) buildParsedColumns() ([]schema.Column, error) { }) } - return parsedColumns, nil + if t.tableCfg == nil { + return parsedColumns, nil + } + + // Exclude columns (if any) from the table metadata + cols, err := column.FilterOutExcludedColumns(parsedColumns, t.tableCfg.ExcludeColumns, t.PartitionKeys()) + if err != nil { + return nil, err + } + + // Include columns (if any) from the table metadata + cols, err = column.FilterForIncludedColumns(cols, t.tableCfg.IncludeColumns, t.PartitionKeys()) + if err != nil { + return nil, err + } + + return cols, nil } func (t TableAdapter) TopicSuffix() string { diff --git a/sources/mysql/streaming/dml.go b/sources/mysql/streaming/dml.go index dd1372b4..f9587f60 100644 --- a/sources/mysql/streaming/dml.go +++ b/sources/mysql/streaming/dml.go @@ -93,7 +93,7 @@ func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent) ([]l return nil, fmt.Errorf("failed to preprocess after row: %w", err) } - dbzMessage, err := dbz.BuildEventPayload(sourcePayload, beforeRow, afterRow, operation, ts) + dbzMessage, err := dbz.BuildEventPayload(sourcePayload, beforeRow, afterRow, operation) if err != nil { return nil, fmt.Errorf("failed to build event payload: %w", err) }