Skip to content

Commit

Permalink
Refactor Distinct Dates (#1030)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 12, 2024
1 parent a670e8a commit 6cc1c7b
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 124 deletions.
2 changes: 1 addition & 1 deletion clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
var additionalEqualityStrings []string
if tableData.TopicConfig().BigQueryPartitionSettings != nil {
distinctDates, err := tableData.DistinctDates(tableData.TopicConfig().BigQueryPartitionSettings.PartitionField)
distinctDates, err := buildDistinctDates(tableData.TopicConfig().BigQueryPartitionSettings.PartitionField, tableData.Rows())
if err != nil {
return fmt.Errorf("failed to generate distinct dates: %w", err)
}
Expand Down
28 changes: 28 additions & 0 deletions clients/bigquery/partition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package bigquery

import (
"fmt"
"maps"
"slices"

"github.com/artie-labs/transfer/lib/typing/ext"
)

func buildDistinctDates(colName string, rows []map[string]any) ([]string, error) {
dateMap := make(map[string]bool)
for _, row := range rows {
val, isOk := row[colName]
if !isOk {
return nil, fmt.Errorf("column %q does not exist in row: %v", colName, row)
}

_time, err := ext.ParseDateFromAny(val)
if err != nil {
return nil, fmt.Errorf("column %q is not a time column, value: %v, err: %w", colName, val, err)
}

dateMap[_time.Format(ext.PostgresDateFormat)] = true
}

return slices.Collect(maps.Keys(dateMap)), nil
}
53 changes: 53 additions & 0 deletions clients/bigquery/partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package bigquery

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestDistinctDates(t *testing.T) {
{
// Invalid date
dates, err := buildDistinctDates("ts", []map[string]any{
{"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)},
{"ts": nil},
})
assert.ErrorContains(t, err, `column "ts" is not a time column`)
assert.Empty(t, dates)
}
{
// No dates
dates, err := buildDistinctDates("", nil)
assert.NoError(t, err)
assert.Empty(t, dates)
}
{
// One date
dates, err := buildDistinctDates("ts", []map[string]any{
{"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)},
})
assert.NoError(t, err)
assert.Equal(t, []string{"2020-01-01"}, dates)
}
{
// Two dates
dates, err := buildDistinctDates("ts", []map[string]any{
{"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)},
{"ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)},
})
assert.NoError(t, err)
assert.Equal(t, []string{"2020-01-01", "2020-01-02"}, dates)
}
{
// Three days, two unique
dates, err := buildDistinctDates("ts", []map[string]any{
{"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)},
{"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)},
{"ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)},
})
assert.NoError(t, err)
assert.Equal(t, []string{"2020-01-01", "2020-01-02"}, dates)
}
}
29 changes: 2 additions & 27 deletions lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/ext"
)

type TableData struct {
Expand Down Expand Up @@ -173,8 +172,8 @@ func (t *TableData) Rows() []map[string]any {
// History mode, the data is stored under `rows`
rows = append(rows, t.rows...)
} else {
for _, v := range t.rowsData {
rows = append(rows, v)
for _, row := range t.rowsData {
rows = append(rows, row)
}
}

Expand All @@ -193,30 +192,6 @@ func (t *TableData) NumberOfRows() uint {
return uint(len(t.rowsData))
}

func (t *TableData) DistinctDates(colName string) ([]string, error) {
retMap := make(map[string]bool)
for _, row := range t.rowsData {
val, isOk := row[colName]
if !isOk {
return nil, fmt.Errorf("col: %v does not exist on row: %v", colName, row)
}

_time, err := ext.ParseDateFromAny(val)
if err != nil {
return nil, fmt.Errorf("col: %v is not a time column, value: %v, err: %w", colName, val, err)
}

retMap[_time.Format(ext.PostgresDateFormat)] = true
}

var distinctDates []string
for key := range retMap {
distinctDates = append(distinctDates, key)
}

return distinctDates, nil
}

func (t *TableData) ResetTempTableSuffix() {
if t == nil {
// This is needed because we periodically wipe tableData
Expand Down
96 changes: 0 additions & 96 deletions lib/optimization/table_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package optimization

import (
"fmt"
"slices"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -16,100 +14,6 @@ import (
"github.com/artie-labs/transfer/lib/typing/decimal"
)

func TestDistinctDates(t *testing.T) {
testCases := []struct {
name string
rowData map[string]map[string]any // pk -> { col -> val }
expectedErr string
expectedDatesString []string
}{
{
name: "no dates",
},
{
name: "one date",
rowData: map[string]map[string]any{
"1": {
"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano),
},
},
expectedDatesString: []string{"2020-01-01"},
},
{
name: "two dates",
rowData: map[string]map[string]any{
"1": {
"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano),
},
"2": {
"ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano),
},
},
expectedDatesString: []string{"2020-01-01", "2020-01-02"},
},
{
name: "3 dates, 2 unique",
rowData: map[string]map[string]any{
"1": {
"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano),
},
"1_duplicate": {
"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano),
},
"2": {
"ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano),
},
},
expectedDatesString: []string{"2020-01-01", "2020-01-02"},
},
{
name: "two dates, one is nil",
rowData: map[string]map[string]any{
"1": {
"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano),
},
"2": {
"ts": nil,
},
},
expectedErr: "col: ts is not a time column",
},
}

for _, testCase := range testCases {
td := &TableData{
rowsData: testCase.rowData,
}

actualValues, actualErr := td.DistinctDates("ts")
if testCase.expectedErr != "" {
assert.ErrorContains(t, actualErr, testCase.expectedErr, testCase.name)
} else {
assert.NoError(t, actualErr, testCase.name)
assert.Equal(t, true, slicesEqualUnordered(testCase.expectedDatesString, actualValues),
fmt.Sprintf("2 arrays not the same, test name: %s, expected array: %v, actual array: %v",
testCase.name, testCase.expectedDatesString, actualValues))
}
}
}

func slicesEqualUnordered(s1, s2 []string) bool {
if len(s1) != len(s2) {
return false
}

slices.Sort(s1)
slices.Sort(s2)

for i, v := range s1 {
if v != s2[i] {
return false
}
}

return true
}

func TestTableData_ReadOnlyInMemoryCols(t *testing.T) {
// Making sure the columns are actually read only.
var cols columns.Columns
Expand Down

0 comments on commit 6cc1c7b

Please sign in to comment.