Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DDL] Simplify Diff and not rely on *columns.Columns #1039

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions clients/shared/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func Append(ctx context.Context, dwh destination.DataWarehouse, tableData *optim

// We don't care about srcKeysMissing because we don't drop columns when we append.
_, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols(),
tableConfig.Columns(),
tableData.ReadOnlyInMemoryCols().GetColumns(),
tableConfig.Columns().GetColumns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
Expand Down
11 changes: 8 additions & 3 deletions clients/shared/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi
return fmt.Errorf("failed to get table config: %w", err)
}

srcKeysMissing, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
tableData.TopicConfig().SoftDelete, tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt, tableData.Mode())
srcKeysMissing, targetKeysMissing := columns.Diff(
tableData.ReadOnlyInMemoryCols().GetColumns(),
tableConfig.Columns().GetColumns(),
tableData.TopicConfig().SoftDelete,
tableData.TopicConfig().IncludeArtieUpdatedAt,
tableData.TopicConfig().IncludeDatabaseUpdatedAt,
tableData.Mode(),
)

tableID := dwh.IdentifierFor(tableData.TopicConfig(), tableData.Name())
createAlterTableArgs := ddl.AlterTableArgs{
Expand Down
39 changes: 15 additions & 24 deletions lib/typing/columns/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"strings"

"github.com/artie-labs/transfer/lib/config"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/maputil"
)

// shouldSkipColumn takes the `colName` and `softDelete` and will return whether we should skip this column when calculating the diff.
Expand Down Expand Up @@ -40,26 +40,18 @@ func shouldSkipColumn(colName string, softDelete bool, includeArtieUpdatedAt boo

// Diff - when given 2 maps, a source and target
// It will provide a diff in the form of 2 variables
func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bool, includeArtieUpdatedAt bool, includeDatabaseUpdatedAt bool, mode config.Mode) ([]Column, []Column) {
src := CloneColumns(columnsInSource)
targ := CloneColumns(columnsInDestination)
var colsToDelete []Column
for _, col := range src.GetColumns() {
_, isOk := targ.GetColumn(col.Name())
if isOk {
colsToDelete = append(colsToDelete, col)

func Diff(columnsInSource []Column, columnsInDestination []Column, softDelete bool, includeArtieUpdatedAt bool, includeDatabaseUpdatedAt bool, mode config.Mode) ([]Column, []Column) {
src := buildColumnsMap(columnsInSource)
targ := buildColumnsMap(columnsInDestination)
for _, colName := range src.Keys() {
if _, isOk := targ.Get(colName); isOk {
targ.Remove(colName)
src.Remove(colName)
}
}

// We cannot delete inside a for-loop that is iterating over src.GetColumns() because we are messing up the array order.
for _, colToDelete := range colsToDelete {
src.DeleteColumn(colToDelete.Name())
targ.DeleteColumn(colToDelete.Name())
}

var targetColumnsMissing Columns
for _, col := range src.GetColumns() {
for _, col := range src.All() {
if shouldSkipColumn(col.Name(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) {
continue
}
Expand All @@ -68,7 +60,7 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo
}

var sourceColumnsMissing Columns
for _, col := range targ.GetColumns() {
for _, col := range targ.All() {
if shouldSkipColumn(col.Name(), softDelete, includeArtieUpdatedAt, includeDatabaseUpdatedAt, mode) {
continue
}
Expand All @@ -79,12 +71,11 @@ func Diff(columnsInSource *Columns, columnsInDestination *Columns, softDelete bo
return sourceColumnsMissing.GetColumns(), targetColumnsMissing.GetColumns()
}

func CloneColumns(cols *Columns) *Columns {
var newCols Columns
for _, col := range cols.GetColumns() {
col.ToLowerName()
newCols.AddColumn(col)
func buildColumnsMap(cols []Column) *maputil.OrderedMap[Column] {
retMap := maputil.NewOrderedMap[Column](false)
for _, col := range cols {
retMap.Add(col.name, col)
}

return &newCols
return retMap
}
234 changes: 40 additions & 194 deletions lib/typing/columns/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,210 +98,56 @@ func TestShouldSkipColumn(t *testing.T) {
}
}

func TestDiff_VariousNils(t *testing.T) {
type _testCase struct {
name string
sourceCols *Columns
targCols *Columns

expectedSrcKeyLength int
expectedTargKeyLength int
}

var sourceColsNotNil Columns
var targColsNotNil Columns
sourceColsNotNil.AddColumn(NewColumn("foo", typing.Invalid))
targColsNotNil.AddColumn(NewColumn("foo", typing.Invalid))
testCases := []_testCase{
{
name: "both &Columns{}",
sourceCols: &Columns{},
targCols: &Columns{},
},
{
name: "only targ is &Columns{}",
sourceCols: &sourceColsNotNil,
targCols: &Columns{},
expectedTargKeyLength: 1,
},
{
name: "only source is &Columns{}",
sourceCols: &Columns{},
targCols: &targColsNotNil,
expectedSrcKeyLength: 1,
},
{
name: "both nil",
sourceCols: nil,
targCols: nil,
},
{
name: "only targ is nil",
sourceCols: &sourceColsNotNil,
targCols: nil,
expectedTargKeyLength: 1,
},
{
name: "only source is nil",
sourceCols: nil,
targCols: &targColsNotNil,
expectedSrcKeyLength: 1,
},
func TestDiff(t *testing.T) {
{
// The same columns
columns := []Column{NewColumn("a", typing.String), NewColumn("b", typing.Boolean)}
sourceKeysMissing, targKeysMissing := Diff(columns, columns, false, false, false, config.Replication)
assert.Len(t, sourceKeysMissing, 0)
assert.Len(t, targKeysMissing, 0)
}

for _, testCase := range testCases {
actualSrcKeysMissing, actualTargKeysMissing := Diff(testCase.sourceCols, testCase.targCols, false, false, false, config.Replication)
assert.Equal(t, testCase.expectedSrcKeyLength, len(actualSrcKeysMissing), testCase.name)
assert.Equal(t, testCase.expectedTargKeyLength, len(actualTargKeysMissing), testCase.name)
{
// Source column has an extra column
sourceCols := []Column{NewColumn("a", typing.String), NewColumn("b", typing.Boolean)}
targCols := []Column{NewColumn("a", typing.String)}

sourceKeysMissing, targKeysMissing := Diff(sourceCols, targCols, false, false, false, config.Replication)
assert.Len(t, sourceKeysMissing, 0)
assert.Len(t, targKeysMissing, 1)
assert.Equal(t, targKeysMissing[0], NewColumn("b", typing.Boolean))
}
}

func TestDiffBasic(t *testing.T) {
var source Columns
source.AddColumn(NewColumn("a", typing.Integer))

srcKeyMissing, targKeyMissing := Diff(&source, &source, false, false, false, config.Replication)
assert.Equal(t, len(srcKeyMissing), 0)
assert.Equal(t, len(targKeyMissing), 0)
}

func TestDiffDelta1(t *testing.T) {
var sourceCols Columns
var targCols Columns
for colName, kindDetails := range map[string]typing.KindDetails{
"a": typing.String,
"b": typing.Boolean,
"c": typing.Struct,
} {
sourceCols.AddColumn(NewColumn(colName, kindDetails))
{
// Destination has an extra column
sourceCols := []Column{NewColumn("a", typing.String)}
targCols := []Column{NewColumn("a", typing.String), NewColumn("b", typing.Boolean)}

sourceKeysMissing, targKeysMissing := Diff(sourceCols, targCols, false, false, false, config.Replication)
assert.Len(t, sourceKeysMissing, 1)
assert.Equal(t, sourceKeysMissing[0], NewColumn("b", typing.Boolean))
assert.Len(t, targKeysMissing, 0)
}

for colName, kindDetails := range map[string]typing.KindDetails{
"aa": typing.String,
"b": typing.Boolean,
"cc": typing.String,
} {
targCols.AddColumn(NewColumn(colName, kindDetails))
{
// Source and destination both have different columns
sourceCols := []Column{NewColumn("a", typing.String), NewColumn("b", typing.Boolean)}
targCols := []Column{NewColumn("c", typing.String), NewColumn("d", typing.Boolean)}

sourceKeysMissing, targKeysMissing := Diff(sourceCols, targCols, false, false, false, config.Replication)
assert.Len(t, sourceKeysMissing, 2)
assert.Equal(t, sourceKeysMissing, targCols)
assert.Len(t, targKeysMissing, 2)
assert.Equal(t, targKeysMissing, sourceCols)
}

srcKeyMissing, targKeyMissing := Diff(&sourceCols, &targCols, false, false, false, config.Replication)
assert.Equal(t, len(srcKeyMissing), 2, srcKeyMissing) // Missing aa, cc
assert.Equal(t, len(targKeyMissing), 2, targKeyMissing) // Missing aa, cc
}

func TestDiffDelta2(t *testing.T) {
var sourceCols Columns
var targetCols Columns

for colName, kindDetails := range map[string]typing.KindDetails{
"a": typing.String,
"aa": typing.String,
"b": typing.Boolean,
"c": typing.Struct,
"d": typing.String,
"CC": typing.String,
"cC": typing.String,
"Cc": typing.String,
} {
sourceCols.AddColumn(NewColumn(colName, kindDetails))
}

for colName, kindDetails := range map[string]typing.KindDetails{
"aa": typing.String,
"b": typing.Boolean,
"cc": typing.String,
"CC": typing.String,
"dd": typing.String,
} {
targetCols.AddColumn(NewColumn(colName, kindDetails))
}

srcKeyMissing, targKeyMissing := Diff(&sourceCols, &targetCols, false, false, false, config.Replication)
assert.Equal(t, len(srcKeyMissing), 1, srcKeyMissing) // Missing dd
assert.Equal(t, len(targKeyMissing), 3, targKeyMissing) // Missing a, c, d
}

func TestDiffDeterministic(t *testing.T) {
retMap := map[string]bool{}

var sourceCols Columns
var targCols Columns

sourceCols.AddColumn(NewColumn("id", typing.Integer))
sourceCols.AddColumn(NewColumn("name", typing.String))

for i := 0; i < 500; i++ {
keysMissing, targetKeysMissing := Diff(&sourceCols, &targCols, false, false, false, config.Replication)
assert.Equal(t, 0, len(keysMissing), keysMissing)

var key string
for _, targetKeyMissing := range targetKeysMissing {
key += targetKeyMissing.Name()
}

retMap[key] = false
}

assert.Equal(t, 1, len(retMap), retMap)
}

func TestCopyColMap(t *testing.T) {
func TestBuildColumnsMap(t *testing.T) {
var cols Columns
cols.AddColumn(NewColumn("hello", typing.String))
cols.AddColumn(NewColumn("created_at", typing.TimestampTZ))
cols.AddColumn(NewColumn("updated_at", typing.TimestampTZ))

copiedCols := CloneColumns(&cols)
assert.Equal(t, copiedCols, &cols)

//Delete a row from copiedCols
copiedCols.columns = copiedCols.columns[1:]
assert.NotEqual(t, copiedCols, &cols)
}

func TestCloneColumns(t *testing.T) {
type _testCase struct {
name string
cols *Columns
expectedCols *Columns
}
copiedCols := buildColumnsMap(cols.GetColumns())
copiedCols.Remove("hello")

var cols Columns
cols.AddColumn(NewColumn("foo", typing.String))
cols.AddColumn(NewColumn("bar", typing.String))
cols.AddColumn(NewColumn("xyz", typing.String))
cols.AddColumn(NewColumn("abc", typing.String))

var mixedCaseCols Columns
mixedCaseCols.AddColumn(NewColumn("foo", typing.String))
mixedCaseCols.AddColumn(NewColumn("bAr", typing.String))
mixedCaseCols.AddColumn(NewColumn("XYZ", typing.String))
mixedCaseCols.AddColumn(NewColumn("aBC", typing.String))

testCases := []_testCase{
{
name: "nil col",
expectedCols: &Columns{},
},
{
name: "&Columns{}",
cols: &Columns{},
expectedCols: &Columns{},
},
{
name: "copying columns",
cols: &cols,
expectedCols: &cols,
},
{
name: "mixed case cols",
cols: &mixedCaseCols,
expectedCols: &cols,
},
}

for _, testCase := range testCases {
actualCols := CloneColumns(testCase.cols)
assert.Equal(t, testCase.expectedCols, actualCols, testCase.name)
}
assert.Len(t, copiedCols.Keys(), 2)
assert.Len(t, cols.GetColumns(), 3)
}