Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable retention period for warehouse uploads #5045

Merged
merged 4 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,8 +1483,9 @@

_, err = stmt.Exec()
if err != nil {
pqError, ok := err.(*pq.Error)
if ok && pqError.Code == pq.ErrorCode("42P01") {
var pqError *pq.Error
ok := errors.As(err, &pqError)
if ok && pqError.Code == ("42P01") {

Check warning on line 1488 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L1486-L1488

Added lines #L1486 - L1488 were not covered by tests
jd.logger.Infof("[%s] sql statement(%s) exec failed because table doesn't exist", jd.tablePrefix, sqlStatement)
_, err = tx.Exec(rollbackSql)
jd.assertError(err)
Expand Down Expand Up @@ -3178,7 +3179,7 @@
var job JobT
sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.uuid, %[1]s.user_id, %[1]s.parameters, %[1]s.custom_val, %[1]s.event_payload, %[1]s.created_at, %[1]s.expire_at FROM %[1]s WHERE %[1]s.job_id = %[2]d`, dsList[len(dsList)-1].JobTable, maxID)
err := jd.dbHandle.QueryRow(sqlStatement).Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal, &job.EventPayload, &job.CreatedAt, &job.ExpireAt)
if err != nil && err != sql.ErrNoRows {
if err != nil && !errors.Is(err, sql.ErrNoRows) {

Check warning on line 3182 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L3182

Added line #L3182 was not covered by tests
jd.assertError(err)
}
return &job
Expand Down
6 changes: 2 additions & 4 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import (
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/sqlutil"

"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/sqlutil"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/admin"
Expand Down
59 changes: 55 additions & 4 deletions warehouse/archive/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
"net/url"
"time"

sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/iancoleman/strcase"
"github.com/lib/pq"
"github.com/tidwall/gjson"
Expand All @@ -21,11 +17,13 @@
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/services/archiver/tablearchiver"
"github.com/rudderlabs/rudder-server/utils/filemanagerutil"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
Expand Down Expand Up @@ -61,7 +59,9 @@

config struct {
archiveUploadRelatedRecords config.ValueLoader[bool]
canDeleteUploads config.ValueLoader[bool]
uploadsArchivalTimeInDays config.ValueLoader[int]
uploadRetentionTimeInDays config.ValueLoader[int]
archiverTickerTime config.ValueLoader[time.Duration]
backupRowsBatchSize config.ValueLoader[int]
maxLimit config.ValueLoader[int]
Expand All @@ -88,7 +88,9 @@
}

a.config.archiveUploadRelatedRecords = a.conf.GetReloadableBoolVar(true, "Warehouse.archiveUploadRelatedRecords")
a.config.canDeleteUploads = a.conf.GetReloadableBoolVar(false, "Warehouse.canDeleteUploads")
a.config.uploadsArchivalTimeInDays = a.conf.GetReloadableIntVar(5, 1, "Warehouse.uploadsArchivalTimeInDays")
a.config.uploadRetentionTimeInDays = a.conf.GetReloadableIntVar(90, 1, "Warehouse.uploadRetentionTimeInDays")
a.config.backupRowsBatchSize = a.conf.GetReloadableIntVar(100, 1, "Warehouse.Archiver.backupRowsBatchSize")
a.config.archiverTickerTime = a.conf.GetReloadableDurationVar(360, time.Minute, "Warehouse.archiverTickerTime", "Warehouse.archiverTickerTimeInMin") // default 6 hours
a.config.maxLimit = a.conf.GetReloadableIntVar(10000, 1, "Warehouse.Archiver.maxLimit")
Expand Down Expand Up @@ -510,3 +512,52 @@

return nil
}

func (a *Archiver) Delete(ctx context.Context) error {
a.log.Infon(`Started deleting for warehouse`)

maxLimit := a.config.maxLimit.Load()

for {
count, err := a.deleteUploads(ctx, maxLimit)
if err != nil {
return fmt.Errorf("deleting uploads: %w", err)

Check warning on line 524 in warehouse/archive/archiver.go

View check run for this annotation

Codecov / codecov/patch

warehouse/archive/archiver.go#L524

Added line #L524 was not covered by tests
}
if count == 0 {
break
}
}
return nil
}

func (a *Archiver) deleteUploads(ctx context.Context, limit int) (int64, error) {
skipWorkspaceIDs := []string{""}
skipWorkspaceIDs = append(skipWorkspaceIDs, a.tenantManager.DegradedWorkspaces()...)

sqlStatement := fmt.Sprintf(`
WITH rows_to_delete AS (
SELECT ctid
FROM %[1]s
WHERE created_at < NOW() - $1::interval
AND status = $2
AND NOT workspace_id = ANY ($3)
LIMIT $4
)
DELETE FROM %[1]s
WHERE ctid IN (SELECT ctid FROM rows_to_delete);
`,
pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable))

result, err := a.db.ExecContext(
ctx,
sqlStatement,
fmt.Sprintf("%d DAY", a.config.uploadRetentionTimeInDays.Load()),
model.ExportedData,
pq.Array(skipWorkspaceIDs),
limit,
)
if err != nil {
return 0, fmt.Errorf("error deleting uploads: %w", err)

Check warning on line 560 in warehouse/archive/archiver.go

View check run for this annotation

Codecov / codecov/patch

warehouse/archive/archiver.go#L560

Added line #L560 was not covered by tests
}
return result.RowsAffected()
}
62 changes: 61 additions & 1 deletion warehouse/archive/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestArchiver(t *testing.T) {
db := sqlmw.New(pgResource.DB)

archiver := archive.New(
config.New(),
c,
logger.NOP,
mockStats,
db,
Expand Down Expand Up @@ -214,6 +214,66 @@ func TestArchiver(t *testing.T) {
}
}

func TestArchiver_Delete(t *testing.T) {
var pgResource *postgres.Resource

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pgResource, err = postgres.Setup(pool, t)
require.NoError(t, err)

t.Log("db:", pgResource.DBDsn)

err = (&migrator.Migrator{
Handle: pgResource.DB,
MigrationsTable: "wh_schema_migrations",
}).Migrate("warehouse")
require.NoError(t, err)

sqlStatement, err := os.ReadFile("testdata/dump.sql")
require.NoError(t, err)

_, err = pgResource.DB.Exec(string(sqlStatement))
require.NoError(t, err)

ctrl := gomock.NewController(t)
mockStats := mock_stats.NewMockStats(ctrl)
mockStats.EXPECT().NewStat(gomock.Any(), gomock.Any()).Times(1)

status := model.ExportedData
workspaceID := "1"
_, err = pgResource.DB.Exec(`
UPDATE wh_uploads SET workspace_id = $1, status = $2
`, workspaceID, status)
require.NoError(t, err)

c := config.New()
c.Set("Warehouse.uploadRetentionTimeInDays", 0)
tenantManager := multitenant.New(c, backendConfig.DefaultBackendConfig)

db := sqlmw.New(pgResource.DB)

archiver := archive.New(
c,
logger.NOP,
mockStats,
db,
filemanager.New,
tenantManager,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = archiver.Delete(ctx)
require.NoError(t, err)

var count int
err = pgResource.DB.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM %q`, warehouseutils.WarehouseUploadsTable)).Scan(&count)
require.NoError(t, err)
require.Zero(t, count, "wh_uploads rows should be deleted")
}

func jsonTestData(t require.TestingT, file string, value any) {
f, err := os.Open(file)
require.NoError(t, err)
Expand Down
10 changes: 9 additions & 1 deletion warehouse/archive/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"context"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
)

func CronArchiver(ctx context.Context, a *Archiver) {
Expand All @@ -15,7 +17,13 @@
if a.config.archiveUploadRelatedRecords.Load() {
err := a.Do(ctx)
if err != nil {
a.log.Errorf(`Error archiving uploads: %v`, err)
a.log.Errorn(`Error archiving uploads`, obskit.Error(err))

Check warning on line 20 in warehouse/archive/cron.go

View check run for this annotation

Codecov / codecov/patch

warehouse/archive/cron.go#L20

Added line #L20 was not covered by tests
}
}
if a.config.canDeleteUploads.Load() {
err := a.Delete(ctx)
if err != nil {
a.log.Errorn(`Error deleting uploads`, obskit.Error(err))

Check warning on line 26 in warehouse/archive/cron.go

View check run for this annotation

Codecov / codecov/patch

warehouse/archive/cron.go#L24-L26

Added lines #L24 - L26 were not covered by tests
}
}
}
Expand Down
Loading