From c824ea0bc100fef695ab2ed64262afbed4f9c4ac Mon Sep 17 00:00:00 2001 From: achettyiitr Date: Thu, 14 Nov 2024 02:04:36 +0530 Subject: [PATCH] fix: bq partitioning for additional columns --- .../integrations/bigquery/bigquery_test.go | 61 +++++++++++++++++++ warehouse/integrations/bigquery/partition.go | 9 ++- .../integrations/bigquery/partition_test.go | 23 ++++++- 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/warehouse/integrations/bigquery/bigquery_test.go b/warehouse/integrations/bigquery/bigquery_test.go index 4f09cb41b8..c76c25b05e 100644 --- a/warehouse/integrations/bigquery/bigquery_test.go +++ b/warehouse/integrations/bigquery/bigquery_test.go @@ -300,6 +300,67 @@ func TestIntegration(t *testing.T) { require.ElementsMatch(t, groupsRecords, whth.UploadJobGroupsAppendRecords(userIDFormat, sourceID, destinationID, destType)) }, }, + { + name: "Append mode (partitionColumn: timestamp, partitionType: hour)", + tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "_groups"}, + warehouseEventsMap2: whth.EventsCountMap{ + // For all tables we will be appending because of preferAppend config + "identifies": 8, "users": 2, "tracks": 8, "product_track": 8, "pages": 8, "screens": 8, "aliases": 8, "_groups": 8, + }, + stagingFilePath1: "../testdata/upload-job.events-1.json", + stagingFilePath2: "../testdata/upload-job.events-1.json", + useSameUserID: true, + postLoading: func(t testing.TB, ctx context.Context, db *bigquery.Client, namespace string) { + t.Helper() + + checkTables := []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "_groups"} + + tables := listTables(t, ctx, db, namespace) + filteredTables := lo.Filter(tables, func(table *bigquery.TableMetadata, _ int) bool { + return lo.Contains(checkTables, table.Name) + }) + for _, table := range filteredTables { + require.NotNil(t, table.TimePartitioning) + require.Equal(t, "timestamp", table.TimePartitioning.Field) + require.Equal(t, bigquery.HourPartitioningType, table.TimePartitioning.Type) + } + partitions := listPartitions(t, ctx, db, namespace) + filteredPartitions := lo.Filter(partitions, func(table lo.Tuple2[string, string], _ int) bool { + return lo.Contains(checkTables, table.A) + }) + for _, partition := range filteredPartitions { + require.Equal(t, partition.B, "2023051204") + } + }, + configOverride: map[string]any{ + "partitionColumn": "timestamp", + "partitionType": "hour", + }, + verifySchema: func(t testing.TB, db *bigquery.Client, namespace string) { + t.Helper() + schema := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT t.table_name, c.column_name, c.data_type FROM %[1]s.INFORMATION_SCHEMA.TABLES as t LEFT JOIN %[1]s.INFORMATION_SCHEMA.COLUMNS as c ON (t.table_name = c.table_name) WHERE (t.table_type != 'VIEW') AND ( c.column_name != '_PARTITIONTIME' OR c.column_name IS NULL );`, namespace)) + require.Equal(t, expectedUploadJobSchema, whth.ConvertRecordsToSchema(schema)) + }, + verifyRecords: func(t testing.TB, db *bigquery.Client, sourceID, destinationID, namespace, jobRunID, taskRunID string) { + t.Helper() + identifiesRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, %s, context_traits_logins, _as, name, logins, email, original_timestamp, context_ip, context_traits_as, timestamp, received_at, context_destination_type, sent_at, context_source_type, context_traits_between, context_source_id, context_traits_name, context_request_ip, _between, context_traits_email, context_destination_id, id FROM %s.%s ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "identifies")) + require.ElementsMatch(t, identifiesRecords, whth.UploadJobIdentifiesAppendRecords(userIDFormat, sourceID, destinationID, destType)) + usersRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_type, context_request_ip, context_traits_name, context_traits_between, _as, logins, sent_at, context_traits_logins, context_ip, _between, context_traits_email, timestamp, context_destination_id, email, context_traits_as, context_source_type, SUBSTRING(id, 1, 9), %s, received_at, name, original_timestamp FROM %s.%s ORDER BY id;`, uuidTSSQL, namespace, "users")) + require.ElementsMatch(t, usersRecords, whth.UploadJobUsersAppendRecordsUsingUsersLoadFiles(userIDFormat, sourceID, destinationID, destType)) + tracksRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT original_timestamp, context_destination_id, context_destination_type, %s, context_source_type, timestamp, id, event, sent_at, context_ip, event_text, context_source_id, context_request_ip, received_at, %s FROM %s.%s ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "tracks")) + require.ElementsMatch(t, tracksRecords, whth.UploadJobTracksAppendRecords(userIDFormat, sourceID, destinationID, destType)) + productTrackRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT timestamp, %s, product_id, received_at, context_source_id, sent_at, context_source_type, context_ip, context_destination_type, original_timestamp, context_request_ip, context_destination_id, %s, _as, review_body, _between, review_id, event_text, id, event, rating FROM %s.%s ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "product_track")) + require.ElementsMatch(t, productTrackRecords, whth.UploadJobProductTrackAppendRecords(userIDFormat, sourceID, destinationID, destType)) + pagesRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT %s, context_source_id, id, title, timestamp, context_source_type, _as, received_at, context_destination_id, context_ip, context_destination_type, name, original_timestamp, _between, context_request_ip, sent_at, url, %s FROM %s.%s ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "pages")) + require.ElementsMatch(t, pagesRecords, whth.UploadJobPagesAppendRecords(userIDFormat, sourceID, destinationID, destType)) + screensRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, url, context_source_type, title, original_timestamp, %s, _between, context_ip, name, context_request_ip, %s, context_source_id, id, received_at, context_destination_id, timestamp, sent_at, _as FROM %s.%s ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "screens")) + require.ElementsMatch(t, screensRecords, whth.UploadJobScreensAppendRecords(userIDFormat, sourceID, destinationID, destType)) + aliasesRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_source_id, context_destination_id, context_ip, sent_at, id, %s, %s, previous_id, original_timestamp, context_source_type, received_at, context_destination_type, context_request_ip, timestamp FROM %s.%s ORDER BY id;`, userIDSQL, uuidTSSQL, namespace, "aliases")) + require.ElementsMatch(t, aliasesRecords, whth.UploadJobAliasesAppendRecords(userIDFormat, sourceID, destinationID, destType)) + groupsRecords := bqhelper.RetrieveRecordsFromWarehouse(t, db, fmt.Sprintf(`SELECT context_destination_type, id, _between, plan, original_timestamp, %s, context_source_id, sent_at, %s, group_id, industry, context_request_ip, context_source_type, timestamp, employees, _as, context_destination_id, received_at, name, context_ip FROM %s.%s ORDER BY id;`, uuidTSSQL, userIDSQL, namespace, "_groups")) + require.ElementsMatch(t, groupsRecords, whth.UploadJobGroupsAppendRecords(userIDFormat, sourceID, destinationID, destType)) + }, + }, { name: "Append mode (partitionColumn: received_at, partitionType: hour)", tables: []string{"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "_groups"}, diff --git a/warehouse/integrations/bigquery/partition.go b/warehouse/integrations/bigquery/partition.go index 4ba1d012a9..525925625c 100644 --- a/warehouse/integrations/bigquery/partition.go +++ b/warehouse/integrations/bigquery/partition.go @@ -17,9 +17,12 @@ var ( ) var supportedPartitionColumnMap = map[string]struct{}{ - "_PARTITIONTIME": {}, - "loaded_at": {}, - "received_at": {}, + "_PARTITIONTIME": {}, + "loaded_at": {}, + "received_at": {}, + "sent_at": {}, + "timestamp": {}, + "original_timestamp": {}, } var supportedPartitionTypeMap = map[string]bigquery.TimePartitioningType{ diff --git a/warehouse/integrations/bigquery/partition_test.go b/warehouse/integrations/bigquery/partition_test.go index c983d913c4..3a9252896d 100644 --- a/warehouse/integrations/bigquery/partition_test.go +++ b/warehouse/integrations/bigquery/partition_test.go @@ -182,7 +182,28 @@ func TestBigQuery_AvoidPartitionDecorator(t *testing.T) { avoidPartitionDecorator: true, }, { - name: "ingenstion partition", + name: "sent_at partition", + destConfig: map[string]interface{}{ + "partitionColumn": "sent_at", + }, + avoidPartitionDecorator: true, + }, + { + name: "timestamp partition", + destConfig: map[string]interface{}{ + "partitionColumn": "timestamp", + }, + avoidPartitionDecorator: true, + }, + { + name: "original_timestamp partition", + destConfig: map[string]interface{}{ + "partitionColumn": "received_at", + }, + avoidPartitionDecorator: true, + }, + { + name: "ingestion partition", destConfig: map[string]interface{}{ "partitionColumn": "_PARTITIONTIME", },