From 8e3e729a7820a11382e0d0fff721078ff59f9392 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Mon, 2 Sep 2024 11:53:50 +0530 Subject: [PATCH 1/2] feat: enable retention period for warehouse uploads --- jobsdb/jobsdb.go | 7 +-- warehouse/app.go | 6 +-- warehouse/archive/archiver.go | 86 ++++++++++++++++++++++++++++-- warehouse/archive/archiver_test.go | 62 ++++++++++++++++++++- warehouse/archive/cron.go | 10 +++- 5 files changed, 158 insertions(+), 13 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 6f92f3b1b7..382964f346 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1483,8 +1483,9 @@ func (jd *Handle) prepareAndExecStmtInTxAllowMissing(tx *sql.Tx, sqlStatement st _, err = stmt.Exec() if err != nil { - pqError, ok := err.(*pq.Error) - if ok && pqError.Code == pq.ErrorCode("42P01") { + var pqError *pq.Error + ok := errors.As(err, &pqError) + if ok && pqError.Code == ("42P01") { jd.logger.Infof("[%s] sql statement(%s) exec failed because table doesn't exist", jd.tablePrefix, sqlStatement) _, err = tx.Exec(rollbackSql) jd.assertError(err) @@ -3178,7 +3179,7 @@ func (jd *Handle) GetLastJob(ctx context.Context) *JobT { var job JobT sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.uuid, %[1]s.user_id, %[1]s.parameters, %[1]s.custom_val, %[1]s.event_payload, %[1]s.created_at, %[1]s.expire_at FROM %[1]s WHERE %[1]s.job_id = %[2]d`, dsList[len(dsList)-1].JobTable, maxID) err := jd.dbHandle.QueryRow(sqlStatement).Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal, &job.EventPayload, &job.CreatedAt, &job.ExpireAt) - if err != nil && err != sql.ErrNoRows { + if err != nil && !errors.Is(err, sql.ErrNoRows) { jd.assertError(err) } return &job diff --git a/warehouse/app.go b/warehouse/app.go index 6bd6a1d460..2f6bdfb3ef 100644 --- a/warehouse/app.go +++ b/warehouse/app.go @@ -11,15 +11,13 @@ import ( "sync/atomic" "time" - "golang.org/x/sync/errgroup" - - "github.com/rudderlabs/rudder-go-kit/sqlutil" - "github.com/cenkalti/backoff/v4" + "golang.org/x/sync/errgroup" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/sqlutil" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/admin" diff --git a/warehouse/archive/archiver.go b/warehouse/archive/archiver.go index f546d5ea94..c8590459d2 100644 --- a/warehouse/archive/archiver.go +++ b/warehouse/archive/archiver.go @@ -9,10 +9,6 @@ import ( "net/url" "time" - sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" - - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/iancoleman/strcase" "github.com/lib/pq" "github.com/tidwall/gjson" @@ -21,11 +17,13 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/services/archiver/tablearchiver" "github.com/rudderlabs/rudder-server/utils/filemanagerutil" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/timeutil" + sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/warehouse/multitenant" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -61,7 +59,9 @@ type Archiver struct { config struct { archiveUploadRelatedRecords config.ValueLoader[bool] + canDeleteUploads config.ValueLoader[bool] uploadsArchivalTimeInDays config.ValueLoader[int] + uploadRetentionTimeInDays config.ValueLoader[int] archiverTickerTime config.ValueLoader[time.Duration] backupRowsBatchSize config.ValueLoader[int] maxLimit config.ValueLoader[int] @@ -88,7 +88,9 @@ func New( } a.config.archiveUploadRelatedRecords = a.conf.GetReloadableBoolVar(true, "Warehouse.archiveUploadRelatedRecords") + a.config.canDeleteUploads = a.conf.GetReloadableBoolVar(false, "Warehouse.canDeleteUploads") a.config.uploadsArchivalTimeInDays = a.conf.GetReloadableIntVar(5, 1, "Warehouse.uploadsArchivalTimeInDays") + a.config.uploadRetentionTimeInDays = a.conf.GetReloadableIntVar(90, 1, "Warehouse.uploadRetentionTimeInDays") a.config.backupRowsBatchSize = a.conf.GetReloadableIntVar(100, 1, "Warehouse.Archiver.backupRowsBatchSize") a.config.archiverTickerTime = a.conf.GetReloadableDurationVar(360, time.Minute, "Warehouse.archiverTickerTime", "Warehouse.archiverTickerTimeInMin") // default 6 hours a.config.maxLimit = a.conf.GetReloadableIntVar(10000, 1, "Warehouse.Archiver.maxLimit") @@ -510,3 +512,79 @@ func (a *Archiver) deleteLoadFileRecords( return nil } + +func (a *Archiver) Delete(ctx context.Context) error { + a.log.Infof(`[Archiver]: Started deleting for warehouse`) + + uploadsToDelete, err := a.countUploadsToDelete(ctx) + if err != nil { + return fmt.Errorf("counting uploads to delete: %w", err) + } + + maxLimit := a.config.maxLimit.Load() + + for uploadsToDelete > 0 { + if err := a.deleteUploads(ctx, maxLimit); err != nil { + return fmt.Errorf("deleting uploads: %w", err) + } + uploadsToDelete -= maxLimit + } + return nil +} + +func (a *Archiver) countUploadsToDelete(ctx context.Context) (int, error) { + skipWorkspaceIDs := []string{""} + skipWorkspaceIDs = append(skipWorkspaceIDs, a.tenantManager.DegradedWorkspaces()...) + + sqlStatement := fmt.Sprintf(` + SELECT + count(*) + FROM + %s + WHERE + created_at < NOW() - $1::interval + AND status = $2 + AND NOT workspace_id = ANY ( $3 );`, + pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable), + ) + + var totalUploads int + + err := a.db.QueryRowContext( + ctx, + sqlStatement, + fmt.Sprintf("%d DAY", a.config.uploadRetentionTimeInDays.Load()), + model.ExportedData, + pq.Array(skipWorkspaceIDs), + ).Scan(&totalUploads) + return totalUploads, err +} + +func (a *Archiver) deleteUploads(ctx context.Context, limit int) error { + skipWorkspaceIDs := []string{""} + skipWorkspaceIDs = append(skipWorkspaceIDs, a.tenantManager.DegradedWorkspaces()...) + + sqlStatement := fmt.Sprintf(` + WITH rows_to_delete AS ( + SELECT ctid + FROM %[1]s + WHERE created_at < NOW() - $1::interval + AND status = $2 + AND NOT workspace_id = ANY ($3) + LIMIT $4 + ) + DELETE FROM %[1]s + WHERE ctid IN (SELECT ctid FROM rows_to_delete); +`, + pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable)) + + _, err := a.db.ExecContext( + ctx, + sqlStatement, + fmt.Sprintf("%d DAY", a.config.uploadRetentionTimeInDays.Load()), + model.ExportedData, + pq.Array(skipWorkspaceIDs), + limit, + ) + return err +} diff --git a/warehouse/archive/archiver_test.go b/warehouse/archive/archiver_test.go index 188e3d1b4f..c8a6e7e586 100644 --- a/warehouse/archive/archiver_test.go +++ b/warehouse/archive/archiver_test.go @@ -155,7 +155,7 @@ func TestArchiver(t *testing.T) { db := sqlmw.New(pgResource.DB) archiver := archive.New( - config.New(), + c, logger.NOP, mockStats, db, @@ -214,6 +214,66 @@ func TestArchiver(t *testing.T) { } } +func TestArchiver_Delete(t *testing.T) { + var pgResource *postgres.Resource + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + pgResource, err = postgres.Setup(pool, t) + require.NoError(t, err) + + t.Log("db:", pgResource.DBDsn) + + err = (&migrator.Migrator{ + Handle: pgResource.DB, + MigrationsTable: "wh_schema_migrations", + }).Migrate("warehouse") + require.NoError(t, err) + + sqlStatement, err := os.ReadFile("testdata/dump.sql") + require.NoError(t, err) + + _, err = pgResource.DB.Exec(string(sqlStatement)) + require.NoError(t, err) + + ctrl := gomock.NewController(t) + mockStats := mock_stats.NewMockStats(ctrl) + mockStats.EXPECT().NewStat(gomock.Any(), gomock.Any()).Times(1) + + status := model.ExportedData + workspaceID := "1" + _, err = pgResource.DB.Exec(` + UPDATE wh_uploads SET workspace_id = $1, status = $2 + `, workspaceID, status) + require.NoError(t, err) + + c := config.New() + c.Set("Warehouse.uploadRetentionTimeInDays", 0) + tenantManager := multitenant.New(c, backendConfig.DefaultBackendConfig) + + db := sqlmw.New(pgResource.DB) + + archiver := archive.New( + c, + logger.NOP, + mockStats, + db, + filemanager.New, + tenantManager, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = archiver.Delete(ctx) + require.NoError(t, err) + + var count int + err = pgResource.DB.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM %q`, warehouseutils.WarehouseUploadsTable)).Scan(&count) + require.NoError(t, err) + require.Equal(t, 0, count, "wh_uploads rows should be deleted") +} + func jsonTestData(t require.TestingT, file string, value any) { f, err := os.Open(file) require.NoError(t, err) diff --git a/warehouse/archive/cron.go b/warehouse/archive/cron.go index d01b22dcc5..0d16a9257c 100644 --- a/warehouse/archive/cron.go +++ b/warehouse/archive/cron.go @@ -3,6 +3,8 @@ package archive import ( "context" "time" + + "github.com/rudderlabs/rudder-go-kit/logger" ) func CronArchiver(ctx context.Context, a *Archiver) { @@ -15,7 +17,13 @@ func CronArchiver(ctx context.Context, a *Archiver) { if a.config.archiveUploadRelatedRecords.Load() { err := a.Do(ctx) if err != nil { - a.log.Errorf(`Error archiving uploads: %v`, err) + a.log.Errorn(`Error archiving uploads`, logger.NewErrorField(err)) + } + } + if a.config.canDeleteUploads.Load() { + err := a.Delete(ctx) + if err != nil { + a.log.Errorn(`Error deleting uploads`, logger.NewErrorField(err)) } } } From cba7adbeecba951c8bfcd306cc0402367ebe65a6 Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 3 Sep 2024 17:34:59 +0530 Subject: [PATCH 2/2] chore: review comments --- warehouse/archive/archiver.go | 53 ++++++++---------------------- warehouse/archive/archiver_test.go | 2 +- warehouse/archive/cron.go | 6 ++-- 3 files changed, 17 insertions(+), 44 deletions(-) diff --git a/warehouse/archive/archiver.go b/warehouse/archive/archiver.go index c8590459d2..f8e623b04e 100644 --- a/warehouse/archive/archiver.go +++ b/warehouse/archive/archiver.go @@ -514,53 +514,23 @@ func (a *Archiver) deleteLoadFileRecords( } func (a *Archiver) Delete(ctx context.Context) error { - a.log.Infof(`[Archiver]: Started deleting for warehouse`) - - uploadsToDelete, err := a.countUploadsToDelete(ctx) - if err != nil { - return fmt.Errorf("counting uploads to delete: %w", err) - } + a.log.Infon(`Started deleting for warehouse`) maxLimit := a.config.maxLimit.Load() - for uploadsToDelete > 0 { - if err := a.deleteUploads(ctx, maxLimit); err != nil { + for { + count, err := a.deleteUploads(ctx, maxLimit) + if err != nil { return fmt.Errorf("deleting uploads: %w", err) } - uploadsToDelete -= maxLimit + if count == 0 { + break + } } return nil } -func (a *Archiver) countUploadsToDelete(ctx context.Context) (int, error) { - skipWorkspaceIDs := []string{""} - skipWorkspaceIDs = append(skipWorkspaceIDs, a.tenantManager.DegradedWorkspaces()...) - - sqlStatement := fmt.Sprintf(` - SELECT - count(*) - FROM - %s - WHERE - created_at < NOW() - $1::interval - AND status = $2 - AND NOT workspace_id = ANY ( $3 );`, - pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable), - ) - - var totalUploads int - - err := a.db.QueryRowContext( - ctx, - sqlStatement, - fmt.Sprintf("%d DAY", a.config.uploadRetentionTimeInDays.Load()), - model.ExportedData, - pq.Array(skipWorkspaceIDs), - ).Scan(&totalUploads) - return totalUploads, err -} - -func (a *Archiver) deleteUploads(ctx context.Context, limit int) error { +func (a *Archiver) deleteUploads(ctx context.Context, limit int) (int64, error) { skipWorkspaceIDs := []string{""} skipWorkspaceIDs = append(skipWorkspaceIDs, a.tenantManager.DegradedWorkspaces()...) @@ -578,7 +548,7 @@ func (a *Archiver) deleteUploads(ctx context.Context, limit int) error { `, pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable)) - _, err := a.db.ExecContext( + result, err := a.db.ExecContext( ctx, sqlStatement, fmt.Sprintf("%d DAY", a.config.uploadRetentionTimeInDays.Load()), @@ -586,5 +556,8 @@ func (a *Archiver) deleteUploads(ctx context.Context, limit int) error { pq.Array(skipWorkspaceIDs), limit, ) - return err + if err != nil { + return 0, fmt.Errorf("error deleting uploads: %w", err) + } + return result.RowsAffected() } diff --git a/warehouse/archive/archiver_test.go b/warehouse/archive/archiver_test.go index c8a6e7e586..62731bf216 100644 --- a/warehouse/archive/archiver_test.go +++ b/warehouse/archive/archiver_test.go @@ -271,7 +271,7 @@ func TestArchiver_Delete(t *testing.T) { var count int err = pgResource.DB.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM %q`, warehouseutils.WarehouseUploadsTable)).Scan(&count) require.NoError(t, err) - require.Equal(t, 0, count, "wh_uploads rows should be deleted") + require.Zero(t, count, "wh_uploads rows should be deleted") } func jsonTestData(t require.TestingT, file string, value any) { diff --git a/warehouse/archive/cron.go b/warehouse/archive/cron.go index 0d16a9257c..13e5c3a199 100644 --- a/warehouse/archive/cron.go +++ b/warehouse/archive/cron.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/rudderlabs/rudder-go-kit/logger" + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" ) func CronArchiver(ctx context.Context, a *Archiver) { @@ -17,13 +17,13 @@ func CronArchiver(ctx context.Context, a *Archiver) { if a.config.archiveUploadRelatedRecords.Load() { err := a.Do(ctx) if err != nil { - a.log.Errorn(`Error archiving uploads`, logger.NewErrorField(err)) + a.log.Errorn(`Error archiving uploads`, obskit.Error(err)) } } if a.config.canDeleteUploads.Load() { err := a.Delete(ctx) if err != nil { - a.log.Errorn(`Error deleting uploads`, logger.NewErrorField(err)) + a.log.Errorn(`Error deleting uploads`, obskit.Error(err)) } } }