diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index dbe53764c..5d9197ee0 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -18,7 +18,7 @@ import ( func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { var additionalEqualityStrings []string if tableData.TopicConfig().BigQueryPartitionSettings != nil { - distinctDates, err := tableData.DistinctDates(tableData.TopicConfig().BigQueryPartitionSettings.PartitionField) + distinctDates, err := buildDistinctDates(tableData.TopicConfig().BigQueryPartitionSettings.PartitionField, tableData.Rows()) if err != nil { return fmt.Errorf("failed to generate distinct dates: %w", err) } diff --git a/clients/bigquery/partition.go b/clients/bigquery/partition.go new file mode 100644 index 000000000..43a09af44 --- /dev/null +++ b/clients/bigquery/partition.go @@ -0,0 +1,28 @@ +package bigquery + +import ( + "fmt" + "maps" + "slices" + + "github.com/artie-labs/transfer/lib/typing/ext" +) + +func buildDistinctDates(colName string, rows []map[string]any) ([]string, error) { + dateMap := make(map[string]bool) + for _, row := range rows { + val, isOk := row[colName] + if !isOk { + return nil, fmt.Errorf("column %q does not exist in row: %v", colName, row) + } + + _time, err := ext.ParseDateFromAny(val) + if err != nil { + return nil, fmt.Errorf("column %q is not a time column, value: %v, err: %w", colName, val, err) + } + + dateMap[_time.Format(ext.PostgresDateFormat)] = true + } + + return slices.Collect(maps.Keys(dateMap)), nil +} diff --git a/clients/bigquery/partition_test.go b/clients/bigquery/partition_test.go new file mode 100644 index 000000000..2da05a837 --- /dev/null +++ b/clients/bigquery/partition_test.go @@ -0,0 +1,53 @@ +package bigquery + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDistinctDates(t *testing.T) { + { + // Invalid date + dates, err := buildDistinctDates("ts", []map[string]any{ + {"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)}, + {"ts": nil}, + }) + assert.ErrorContains(t, err, `column "ts" is not a time column`) + assert.Empty(t, dates) + } + { + // No dates + dates, err := buildDistinctDates("", nil) + assert.NoError(t, err) + assert.Empty(t, dates) + } + { + // One date + dates, err := buildDistinctDates("ts", []map[string]any{ + {"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)}, + }) + assert.NoError(t, err) + assert.Equal(t, []string{"2020-01-01"}, dates) + } + { + // Two dates + dates, err := buildDistinctDates("ts", []map[string]any{ + {"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)}, + {"ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)}, + }) + assert.NoError(t, err) + assert.Equal(t, []string{"2020-01-01", "2020-01-02"}, dates) + } + { + // Three days, two unique + dates, err := buildDistinctDates("ts", []map[string]any{ + {"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)}, + {"ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)}, + {"ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano)}, + }) + assert.NoError(t, err) + assert.Equal(t, []string{"2020-01-01", "2020-01-02"}, dates) + } +} diff --git a/lib/optimization/table_data.go b/lib/optimization/table_data.go index 7d3f8b76f..6e1c4a606 100644 --- a/lib/optimization/table_data.go +++ b/lib/optimization/table_data.go @@ -13,7 +13,6 @@ import ( "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" - "github.com/artie-labs/transfer/lib/typing/ext" ) type TableData struct { @@ -173,8 +172,8 @@ func (t *TableData) Rows() []map[string]any { // History mode, the data is stored under `rows` rows = append(rows, t.rows...) } else { - for _, v := range t.rowsData { - rows = append(rows, v) + for _, row := range t.rowsData { + rows = append(rows, row) } } @@ -193,30 +192,6 @@ func (t *TableData) NumberOfRows() uint { return uint(len(t.rowsData)) } -func (t *TableData) DistinctDates(colName string) ([]string, error) { - retMap := make(map[string]bool) - for _, row := range t.rowsData { - val, isOk := row[colName] - if !isOk { - return nil, fmt.Errorf("col: %v does not exist on row: %v", colName, row) - } - - _time, err := ext.ParseDateFromAny(val) - if err != nil { - return nil, fmt.Errorf("col: %v is not a time column, value: %v, err: %w", colName, val, err) - } - - retMap[_time.Format(ext.PostgresDateFormat)] = true - } - - var distinctDates []string - for key := range retMap { - distinctDates = append(distinctDates, key) - } - - return distinctDates, nil -} - func (t *TableData) ResetTempTableSuffix() { if t == nil { // This is needed because we periodically wipe tableData diff --git a/lib/optimization/table_data_test.go b/lib/optimization/table_data_test.go index 9be95fb8b..bb66dcdca 100644 --- a/lib/optimization/table_data_test.go +++ b/lib/optimization/table_data_test.go @@ -2,9 +2,7 @@ package optimization import ( "fmt" - "slices" "testing" - "time" "github.com/stretchr/testify/assert" @@ -16,100 +14,6 @@ import ( "github.com/artie-labs/transfer/lib/typing/decimal" ) -func TestDistinctDates(t *testing.T) { - testCases := []struct { - name string - rowData map[string]map[string]any // pk -> { col -> val } - expectedErr string - expectedDatesString []string - }{ - { - name: "no dates", - }, - { - name: "one date", - rowData: map[string]map[string]any{ - "1": { - "ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano), - }, - }, - expectedDatesString: []string{"2020-01-01"}, - }, - { - name: "two dates", - rowData: map[string]map[string]any{ - "1": { - "ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano), - }, - "2": { - "ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano), - }, - }, - expectedDatesString: []string{"2020-01-01", "2020-01-02"}, - }, - { - name: "3 dates, 2 unique", - rowData: map[string]map[string]any{ - "1": { - "ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano), - }, - "1_duplicate": { - "ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano), - }, - "2": { - "ts": time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano), - }, - }, - expectedDatesString: []string{"2020-01-01", "2020-01-02"}, - }, - { - name: "two dates, one is nil", - rowData: map[string]map[string]any{ - "1": { - "ts": time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339Nano), - }, - "2": { - "ts": nil, - }, - }, - expectedErr: "col: ts is not a time column", - }, - } - - for _, testCase := range testCases { - td := &TableData{ - rowsData: testCase.rowData, - } - - actualValues, actualErr := td.DistinctDates("ts") - if testCase.expectedErr != "" { - assert.ErrorContains(t, actualErr, testCase.expectedErr, testCase.name) - } else { - assert.NoError(t, actualErr, testCase.name) - assert.Equal(t, true, slicesEqualUnordered(testCase.expectedDatesString, actualValues), - fmt.Sprintf("2 arrays not the same, test name: %s, expected array: %v, actual array: %v", - testCase.name, testCase.expectedDatesString, actualValues)) - } - } -} - -func slicesEqualUnordered(s1, s2 []string) bool { - if len(s1) != len(s2) { - return false - } - - slices.Sort(s1) - slices.Sort(s2) - - for i, v := range s1 { - if v != s2[i] { - return false - } - } - - return true -} - func TestTableData_ReadOnlyInMemoryCols(t *testing.T) { // Making sure the columns are actually read only. var cols columns.Columns