Skip to content

Commit

Permalink
[MysQL] Moving filter columns logic (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Dec 18, 2024
1 parent 7b37af0 commit e98ae1a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 19 deletions.
4 changes: 1 addition & 3 deletions lib/debezium/transformer/light_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package transformer

import (
"fmt"
"time"

"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/artie-labs/transfer/lib/debezium"

Expand Down Expand Up @@ -47,7 +45,7 @@ func (l LightDebeziumTransformer) BuildPartitionKey(beforeRow, afterRow Row) (de
return convertPartitionKey(l.valueConverters, l.partitionKeys, row)
}

func (l LightDebeziumTransformer) BuildEventPayload(source util.Source, beforeRow Row, afterRow Row, op string, ts time.Time) (util.SchemaEventPayload, error) {
func (l LightDebeziumTransformer) BuildEventPayload(source util.Source, beforeRow Row, afterRow Row, op string) (util.SchemaEventPayload, error) {
schema := debezium.Schema{FieldsObject: []debezium.FieldsObject{}}
payload := util.Payload{
Source: source,
Expand Down
38 changes: 38 additions & 0 deletions sources/mysql/streaming/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,44 @@ func TestSchemaAdapter_SQLMode(t *testing.T) {
}
}

func TestSchemaAdapter_ColumnFiltering(t *testing.T) {
{
// Excluding column [exclude_me]
adapter := NewSchemaAdapter(config.MySQL{Database: "foo", Tables: []*config.MySQLTable{{Name: "test_table", ExcludeColumns: []string{"exclude_me"}}}}, nil)
assert.Equal(t, "foo", adapter.dbName)

assert.NoError(t, adapter.ApplyDDL(99, "CREATE TABLE test_table (id INT PRIMARY KEY, exclude_me VARCHAR(255));"))
assert.Len(t, adapter.adapters, 1)

tblAdapter, ok := adapter.GetTableAdapter("test_table")
assert.True(t, ok)

assert.Len(t, tblAdapter.columns, 2)

parsedCols, err := tblAdapter.buildParsedColumns()
assert.NoError(t, err)
assert.Len(t, parsedCols, 1)
assert.Equal(t, "id", parsedCols[0].Name)
}
{
// Not excluding
adapter := NewSchemaAdapter(config.MySQL{Database: "foo", Tables: []*config.MySQLTable{{Name: "test_table"}}}, nil)
assert.Equal(t, "foo", adapter.dbName)

assert.NoError(t, adapter.ApplyDDL(99, "CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(255));"))
assert.Len(t, adapter.adapters, 1)

tblAdapter, ok := adapter.GetTableAdapter("test_table")
assert.True(t, ok)

assert.Len(t, tblAdapter.columns, 2)

parsedCols, err := tblAdapter.buildParsedColumns()
assert.NoError(t, err)
assert.Len(t, parsedCols, 2)
}
}

func TestSchemaAdapter_ApplyDDL(t *testing.T) {
{
// Column rename
Expand Down
34 changes: 19 additions & 15 deletions sources/mysql/streaming/ddl/table_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,8 @@ func (t TableAdapter) buildFieldConverters() ([]transformer.FieldConverter, erro
return nil, nil
}

// Exclude columns (if any) from the table metadata
cols, err := column.FilterOutExcludedColumns(t.GetParsedColumns(), t.tableCfg.ExcludeColumns, t.PartitionKeys())
if err != nil {
return nil, err
}

// Include columns (if any) from the table metadata
cols, err = column.FilterForIncludedColumns(cols, t.tableCfg.IncludeColumns, t.PartitionKeys())
if err != nil {
return nil, err
}

fieldConverters := make([]transformer.FieldConverter, len(cols))
for i, col := range cols {
fieldConverters := make([]transformer.FieldConverter, len(t.parsedColumns))
for i, col := range t.parsedColumns {
converter, err := converters.ValueConverterForType(col.Type, col.Opts)
if err != nil {
return nil, fmt.Errorf("failed to build value converter for column %q: %w", col.Name, err)
Expand All @@ -107,7 +95,23 @@ func (t TableAdapter) buildParsedColumns() ([]schema.Column, error) {
})
}

return parsedColumns, nil
if t.tableCfg == nil {
return parsedColumns, nil
}

// Exclude columns (if any) from the table metadata
cols, err := column.FilterOutExcludedColumns(parsedColumns, t.tableCfg.ExcludeColumns, t.PartitionKeys())
if err != nil {
return nil, err
}

// Include columns (if any) from the table metadata
cols, err = column.FilterForIncludedColumns(cols, t.tableCfg.IncludeColumns, t.PartitionKeys())
if err != nil {
return nil, err
}

return cols, nil
}

func (t TableAdapter) TopicSuffix() string {
Expand Down
2 changes: 1 addition & 1 deletion sources/mysql/streaming/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (i *Iterator) processDML(ts time.Time, event *replication.BinlogEvent) ([]l
return nil, fmt.Errorf("failed to preprocess after row: %w", err)
}

dbzMessage, err := dbz.BuildEventPayload(sourcePayload, beforeRow, afterRow, operation, ts)
dbzMessage, err := dbz.BuildEventPayload(sourcePayload, beforeRow, afterRow, operation)
if err != nil {
return nil, fmt.Errorf("failed to build event payload: %w", err)
}
Expand Down

0 comments on commit e98ae1a

Please sign in to comment.