From a10c4dca832b92e2b7eba982689ae56a8d9b3c91 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Wed, 9 Nov 2022 14:31:47 +0530 Subject: [PATCH 1/3] fix(warehouse): id resolution index --- warehouse/identities.go | 6 +++--- warehouse/testhelper/.env | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/warehouse/identities.go b/warehouse/identities.go index 22ae74e1a5..0ca0929da9 100644 --- a/warehouse/identities.go +++ b/warehouse/identities.go @@ -212,7 +212,7 @@ func (wh *HandleT) setupIdentityTables(warehouse warehouseutils.Warehouse) { } sqlStatement = fmt.Sprintf(` - CREATE INDEX IF NOT EXISTS merge_properties_index_ %[1]s ON %[1]s ( + CREATE INDEX IF NOT EXISTS merge_properties_index_%[1]s ON %[1]s ( merge_property_1_type, merge_property_1_value, merge_property_2_type, merge_property_2_value ); @@ -260,7 +260,7 @@ func (wh *HandleT) setupIdentityTables(warehouse warehouseutils.Warehouse) { } sqlStatement = fmt.Sprintf(` - CREATE INDEX IF NOT EXISTS rudder_id_index_ %[1]s ON %[1]s (rudder_id); + CREATE INDEX IF NOT EXISTS rudder_id_index_%[1]s ON %[1]s (rudder_id); `, warehouseutils.IdentityMappingsTableName(warehouse), ) @@ -271,7 +271,7 @@ func (wh *HandleT) setupIdentityTables(warehouse warehouseutils.Warehouse) { } sqlStatement = fmt.Sprintf(` - CREATE INDEX IF NOT EXISTS merge_property_index_ %[1]s ON %[1]s ( + CREATE INDEX IF NOT EXISTS merge_property_index_%[1]s ON %[1]s ( merge_property_type, merge_property_value ); `, diff --git a/warehouse/testhelper/.env b/warehouse/testhelper/.env index 7d2484220a..b30764b4e1 100644 --- a/warehouse/testhelper/.env +++ b/warehouse/testhelper/.env @@ -41,6 +41,7 @@ RSERVER_WAREHOUSE_DELTALAKE_MAX_PARALLEL_LOADS=8 RSERVER_WAREHOUSE_WAREHOUSE_SYNC_FREQ_IGNORE=true RSERVER_WAREHOUSE_UPLOAD_FREQ_IN_S=10 RSERVER_WAREHOUSE_ENABLE_JITTER_FOR_SYNCS=false +RSERVER_WAREHOUSE_ENABLE_IDRESOLUTION=true RSERVER_EVENT_SCHEMAS_ENABLE_EVENT_SCHEMAS_FEATURE=false RSERVER_EVENT_SCHEMAS_SYNC_INTERVAL=15 From 85e8a4bc58d5db133d788ad8e23f0780c7d9cdb3 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Wed, 9 Nov 2022 16:00:49 +0530 Subject: [PATCH 2/3] add conditional check for orderBy clauses for received_at since ID resolution tables doesn't contains it. --- warehouse/bigquery/bigquery.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/warehouse/bigquery/bigquery.go b/warehouse/bigquery/bigquery.go index e4bb40fa9a..5a646ea933 100644 --- a/warehouse/bigquery/bigquery.go +++ b/warehouse/bigquery/bigquery.go @@ -396,17 +396,31 @@ func (bq *HandleT) loadTable(tableName string, _, getLoadFileLocFromTableUploads primaryJoinClause := strings.Join(primaryKeyList, " AND ") bqTable := func(name string) string { return fmt.Sprintf("`%s`.`%s`", bq.namespace, name) } + var orderByClause string + if _, ok := tableColMap["received_at"]; ok { + orderByClause = "ORDER BY received_at DESC" + } + sqlStatement := fmt.Sprintf(`MERGE INTO %[1]s AS original USING ( SELECT * FROM ( - SELECT *, row_number() OVER (PARTITION BY %[7]s ORDER BY RECEIVED_AT DESC) AS _rudder_staging_row_number FROM %[2]s + SELECT *, row_number() OVER (PARTITION BY %[7]s %[8]s) AS _rudder_staging_row_number FROM %[2]s ) AS q WHERE _rudder_staging_row_number = 1 ) AS staging ON (%[3]s) WHEN MATCHED THEN UPDATE SET %[6]s WHEN NOT MATCHED THEN - INSERT (%[4]s) VALUES (%[5]s)`, bqTable(tableName), bqTable(stagingTableName), primaryJoinClause, columnNames, stagingColumnNames, columnsWithValues, partitionKey) + INSERT (%[4]s) VALUES (%[5]s)`, + bqTable(tableName), + bqTable(stagingTableName), + primaryJoinClause, + columnNames, + stagingColumnNames, + columnsWithValues, + partitionKey, + orderByClause, + ) pkgLogger.Infof("BQ: Dedup records for table:%s using staging table: %s\n", tableName, sqlStatement) q := bq.db.Query(sqlStatement) From 81ed945c3031546a226cd51f19184e73a7806b1a Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Wed, 9 Nov 2022 16:20:35 +0530 Subject: [PATCH 3/3] some more changes. --- warehouse/bigquery/bigquery_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/warehouse/bigquery/bigquery_test.go b/warehouse/bigquery/bigquery_test.go index b11da1be9c..2a22552809 100644 --- a/warehouse/bigquery/bigquery_test.go +++ b/warehouse/bigquery/bigquery_test.go @@ -88,7 +88,7 @@ func TestBigQueryIntegration(t *testing.T) { testhelper.SendEvents(t, warehouseTest, sendEventsMap) testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap) - testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap()) + testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap()) testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap()) @@ -103,7 +103,7 @@ func TestBigQueryIntegration(t *testing.T) { testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap) testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap) - testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap()) + testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap()) testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap()) @@ -143,7 +143,7 @@ func TestBigQueryIntegration(t *testing.T) { testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap) testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap) - testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap()) + testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap()) testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap()) @@ -194,6 +194,12 @@ func tableUploadsEventsMap() testhelper.EventsCountMap { return eventsMap } +func stagingFilesEventsMap() testhelper.EventsCountMap { + return testhelper.EventsCountMap{ + "wh_staging_files": 34, // Since extra 2 merge events because of ID resolution + } +} + func mergeEventsMap() testhelper.EventsCountMap { return testhelper.EventsCountMap{ "identifies": 1,