Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(warehouse): verify workspace ID in stats #2615

Merged
merged 7 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ require (
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.37.0
github.com/rivo/uniseg v0.1.0 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand Down Expand Up @@ -196,4 +198,7 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require github.com/golang/protobuf v1.5.2 // indirect
require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,12 @@ github.com/go-ini/ini v1.63.2/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2CSIqUrmQPqA0gdRIlnLEY0gK5JGjh37zN5U=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
Expand Down Expand Up @@ -1010,6 +1012,7 @@ github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOq
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.10/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down Expand Up @@ -1188,11 +1191,14 @@ github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQ
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.0.0-20180110214958-89604d197083/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
Expand All @@ -1201,6 +1207,9 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE=
github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
Expand Down Expand Up @@ -1808,6 +1817,7 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
102 changes: 51 additions & 51 deletions warehouse/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,19 @@ func backupRecords(args backupRecordsArgs) (backupLocation string, err error) {
}

tmpl := fmt.Sprintf(`
SELECT
json_agg(dump_table)
FROM
SELECT
json_agg(dump_table)
FROM
(
SELECT
*
FROM
%[1]s
WHERE
%[2]s
ORDER BY
id ASC
LIMIT
SELECT
*
FROM
%[1]s
WHERE
%[2]s
ORDER BY
id ASC
LIMIT
%[3]s offset %[4]s
) AS dump_table
`,
Expand Down Expand Up @@ -147,28 +147,28 @@ func usedRudderStorage(metadata []byte) bool {
func archiveUploads(dbHandle *sql.DB) {
pkgLogger.Infof(`Started archiving for warehouse`)
sqlStatement := fmt.Sprintf(`
SELECT
id,
source_id,
destination_id,
start_staging_file_id,
end_staging_file_id,
start_load_file_id,
end_load_file_id,
metadata
FROM
%s
WHERE
SELECT
id,
source_id,
destination_id,
start_staging_file_id,
end_staging_file_id,
start_load_file_id,
end_load_file_id,
metadata
FROM
%s
WHERE
(
(
metadata ->> 'archivedStagingAndLoadFiles'
):: bool IS DISTINCT
FROM
):: bool IS DISTINCT
FROM
TRUE
)
AND created_at < NOW() - INTERVAL '%d DAY'
AND status = '%s'
LIMIT
)
AND created_at < NOW() - INTERVAL '%d DAY'
AND status = '%s'
LIMIT
10000;
`,
warehouseutils.WarehouseUploadsTable,
Expand Down Expand Up @@ -225,15 +225,15 @@ func archiveUploads(dbHandle *sql.DB) {

// archive staging files
stmt := fmt.Sprintf(`
SELECT
id,
location
FROM
%s
WHERE
source_id = '%s'
AND destination_id = '%s'
AND id >= %d
SELECT
id,
location
FROM
%s
WHERE
source_id = '%s'
AND destination_id = '%s'
AND id >= %d
and id <= %d;
`,
warehouseutils.WarehouseStagingFilesTable,
Expand Down Expand Up @@ -302,9 +302,9 @@ func archiveUploads(dbHandle *sql.DB) {

// delete staging file records
stmt = fmt.Sprintf(`
DELETE FROM
%s
WHERE
DELETE FROM
%s
WHERE
id IN (%v);
`,
warehouseutils.WarehouseStagingFilesTable,
Expand All @@ -319,9 +319,9 @@ func archiveUploads(dbHandle *sql.DB) {

// delete load file records
stmt = fmt.Sprintf(`
DELETE FROM
%s
WHERE
DELETE FROM
%s
WHERE
staging_file_id = ANY($1) RETURNING location;
`,
warehouseutils.WarehouseLoadFilesTable,
Expand Down Expand Up @@ -371,11 +371,11 @@ func archiveUploads(dbHandle *sql.DB) {
// update upload metadata
u.uploadMetdata, _ = sjson.SetBytes(u.uploadMetdata, "archivedStagingAndLoadFiles", true)
stmt = fmt.Sprintf(`
UPDATE
%s
SET
metadata = $1
WHERE
UPDATE
%s
SET
metadata = $1
WHERE
id = %d;
`,
warehouseutils.WarehouseUploadsTable,
Expand All @@ -398,7 +398,7 @@ func archiveUploads(dbHandle *sql.DB) {
pkgLogger.Debugf(`[Archiver]: Archived upload: %d related staging files at: %s`, u.uploadID, storedStagingFilesLocation)
}

stats.Default.NewTaggedStat("warehouse.archiver.numArchivedUploads", stats.CountType, map[string]string{
stats.Default.NewTaggedStat("warehouse.archiver.numArchivedUploads", stats.CountType, stats.Tags{
"destination": u.destID,
"source": u.sourceID,
}).Count(1)
Expand Down
4 changes: 4 additions & 0 deletions warehouse/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func TestBigQueryIntegration(t *testing.T) {
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
})

t.Run("Append Mode", func(t *testing.T) {
Expand Down Expand Up @@ -145,6 +147,8 @@ func TestBigQueryIntegration(t *testing.T) {
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
})
}

Expand Down
13 changes: 13 additions & 0 deletions warehouse/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ type TestHandle struct {

var handle *TestHandle

var statsToVerify = []string{
"warehouse_clickhouse_commitTimeouts",
"warehouse_clickhouse_execTimeouts",
"warehouse_clickhouse_failedRetries",
"warehouse_clickhouse_syncLoadFileTime",
"warehouse_clickhouse_downloadLoadFilesTime",
"warehouse_clickhouse_numRowsLoadFile",
}

func (*TestHandle) VerifyConnection() error {
err := testhelper.WithConstantBackoff(func() (err error) {
credentials := clickhouse.CredentialsT{
Expand Down Expand Up @@ -178,6 +187,8 @@ func TestClickHouseIntegration(t *testing.T) {
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap())

testhelper.VerifyWorkspaceIDInStats(t, statsToVerify...)
})

t.Run("Cluster Mode Setup", func(t *testing.T) {
Expand Down Expand Up @@ -230,6 +241,8 @@ func TestClickHouseIntegration(t *testing.T) {
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, clusterWarehouseEventsMap())

testhelper.VerifyWorkspaceIDInStats(t, statsToVerify...)
})
}

Expand Down
6 changes: 6 additions & 0 deletions warehouse/datalake/datalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func TestDatalakeIntegration(t *testing.T) {
testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
})
t.Run("AzureDatalake", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -104,6 +106,8 @@ func TestDatalakeIntegration(t *testing.T) {
testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
})
t.Run("GCSDatalake", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -146,6 +150,8 @@ func TestDatalakeIntegration(t *testing.T) {
testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap())

testhelper.VerifyWorkspaceIDInStats(t)
})
}

Expand Down
16 changes: 8 additions & 8 deletions warehouse/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (*HandleT) DeleteBy([]string, warehouseutils.DeleteByParams) error {

// fetchTables fetch tables with tableNames
func (dl *HandleT) fetchTables(dbT *databricks.DBHandleT, schema string) (tableNames []string, err error) {
fetchTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
fetchTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand Down Expand Up @@ -357,7 +357,7 @@ func (dl *HandleT) partitionQuery(tableName string) (string, error) {

// ExecuteSQL executes sql using grpc Client
func (dl *HandleT) ExecuteSQL(sqlStatement, queryType string) (err error) {
execSqlStatTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
execSqlStatTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand Down Expand Up @@ -392,7 +392,7 @@ func (*HandleT) ExecuteSQLClient(dbClient *databricks.DBHandleT, sqlStatement st

// schemaExists checks it schema exists or not.
func (dl *HandleT) schemaExists(schemaName string) (exists bool, err error) {
fetchSchemasExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
fetchSchemasExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand Down Expand Up @@ -431,7 +431,7 @@ func (dl *HandleT) createSchema() (err error) {

// dropStagingTables drops staging tables
func (dl *HandleT) dropStagingTables(tableNames []string) {
dropTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
dropTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand Down Expand Up @@ -805,7 +805,7 @@ func (dl *HandleT) connectToWarehouse() (dbHandleT *databricks.DBHandleT, err er
Path: warehouseutils.GetConfigValue(DLPath, dl.Warehouse),
Token: warehouseutils.GetConfigValue(DLToken, dl.Warehouse),
}
connStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
connStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand All @@ -817,7 +817,7 @@ func (dl *HandleT) connectToWarehouse() (dbHandleT *databricks.DBHandleT, err er
connStat.Start()
defer connStat.End()

closeConnStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
closeConnStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand Down Expand Up @@ -936,7 +936,7 @@ func (dl *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh
filteredTablesNames = append(filteredTablesNames, tableName)
}

fetchTablesAttributesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
fetchTablesAttributesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func (*HandleT) DownloadIdentityRules(*misc.GZipWriter) (err error) {

// GetTotalCountInTable returns total count in tables.
func (dl *HandleT) GetTotalCountInTable(ctx context.Context, tableName string) (total int64, err error) {
fetchTotalCountExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{
fetchTotalCountExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{
"workspaceId": dl.Warehouse.WorkspaceID,
"destination": dl.Warehouse.Destination.ID,
"destType": dl.Warehouse.Type,
Expand Down
Loading