Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable retention period for warehouse uploads #5045

Merged
merged 4 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,8 +1483,9 @@ func (jd *Handle) prepareAndExecStmtInTxAllowMissing(tx *sql.Tx, sqlStatement st

_, err = stmt.Exec()
if err != nil {
pqError, ok := err.(*pq.Error)
if ok && pqError.Code == pq.ErrorCode("42P01") {
var pqError *pq.Error
ok := errors.As(err, &pqError)
if ok && pqError.Code == ("42P01") {
jd.logger.Infof("[%s] sql statement(%s) exec failed because table doesn't exist", jd.tablePrefix, sqlStatement)
_, err = tx.Exec(rollbackSql)
jd.assertError(err)
Expand Down Expand Up @@ -3178,7 +3179,7 @@ func (jd *Handle) GetLastJob(ctx context.Context) *JobT {
var job JobT
sqlStatement := fmt.Sprintf(`SELECT %[1]s.job_id, %[1]s.uuid, %[1]s.user_id, %[1]s.parameters, %[1]s.custom_val, %[1]s.event_payload, %[1]s.created_at, %[1]s.expire_at FROM %[1]s WHERE %[1]s.job_id = %[2]d`, dsList[len(dsList)-1].JobTable, maxID)
err := jd.dbHandle.QueryRow(sqlStatement).Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal, &job.EventPayload, &job.CreatedAt, &job.ExpireAt)
if err != nil && err != sql.ErrNoRows {
if err != nil && !errors.Is(err, sql.ErrNoRows) {
jd.assertError(err)
}
return &job
Expand Down
6 changes: 2 additions & 4 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import (
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/sqlutil"

"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/sqlutil"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/admin"
Expand Down
86 changes: 82 additions & 4 deletions warehouse/archive/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"net/url"
"time"

sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/iancoleman/strcase"
"github.com/lib/pq"
"github.com/tidwall/gjson"
Expand All @@ -21,11 +17,13 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/services/archiver/tablearchiver"
"github.com/rudderlabs/rudder-server/utils/filemanagerutil"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
Expand Down Expand Up @@ -61,7 +59,9 @@ type Archiver struct {

config struct {
archiveUploadRelatedRecords config.ValueLoader[bool]
canDeleteUploads config.ValueLoader[bool]
uploadsArchivalTimeInDays config.ValueLoader[int]
uploadRetentionTimeInDays config.ValueLoader[int]
archiverTickerTime config.ValueLoader[time.Duration]
backupRowsBatchSize config.ValueLoader[int]
maxLimit config.ValueLoader[int]
Expand All @@ -88,7 +88,9 @@ func New(
}

a.config.archiveUploadRelatedRecords = a.conf.GetReloadableBoolVar(true, "Warehouse.archiveUploadRelatedRecords")
a.config.canDeleteUploads = a.conf.GetReloadableBoolVar(false, "Warehouse.canDeleteUploads")
a.config.uploadsArchivalTimeInDays = a.conf.GetReloadableIntVar(5, 1, "Warehouse.uploadsArchivalTimeInDays")
a.config.uploadRetentionTimeInDays = a.conf.GetReloadableIntVar(90, 1, "Warehouse.uploadRetentionTimeInDays")
a.config.backupRowsBatchSize = a.conf.GetReloadableIntVar(100, 1, "Warehouse.Archiver.backupRowsBatchSize")
a.config.archiverTickerTime = a.conf.GetReloadableDurationVar(360, time.Minute, "Warehouse.archiverTickerTime", "Warehouse.archiverTickerTimeInMin") // default 6 hours
a.config.maxLimit = a.conf.GetReloadableIntVar(10000, 1, "Warehouse.Archiver.maxLimit")
Expand Down Expand Up @@ -510,3 +512,79 @@ func (a *Archiver) deleteLoadFileRecords(

return nil
}

func (a *Archiver) Delete(ctx context.Context) error {
a.log.Infof(`[Archiver]: Started deleting for warehouse`)
cisse21 marked this conversation as resolved.
Show resolved Hide resolved

uploadsToDelete, err := a.countUploadsToDelete(ctx)
if err != nil {
return fmt.Errorf("counting uploads to delete: %w", err)
}

maxLimit := a.config.maxLimit.Load()

for uploadsToDelete > 0 {
if err := a.deleteUploads(ctx, maxLimit); err != nil {
return fmt.Errorf("deleting uploads: %w", err)
}
uploadsToDelete -= maxLimit
}
cisse21 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (a *Archiver) countUploadsToDelete(ctx context.Context) (int, error) {
skipWorkspaceIDs := []string{""}
skipWorkspaceIDs = append(skipWorkspaceIDs, a.tenantManager.DegradedWorkspaces()...)

sqlStatement := fmt.Sprintf(`
SELECT
count(*)
FROM
%s
WHERE
created_at < NOW() - $1::interval
AND status = $2
AND NOT workspace_id = ANY ( $3 );`,
pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable),
)

var totalUploads int

err := a.db.QueryRowContext(
ctx,
sqlStatement,
fmt.Sprintf("%d DAY", a.config.uploadRetentionTimeInDays.Load()),
model.ExportedData,
pq.Array(skipWorkspaceIDs),
).Scan(&totalUploads)
return totalUploads, err
}

func (a *Archiver) deleteUploads(ctx context.Context, limit int) error {
skipWorkspaceIDs := []string{""}
skipWorkspaceIDs = append(skipWorkspaceIDs, a.tenantManager.DegradedWorkspaces()...)

sqlStatement := fmt.Sprintf(`
WITH rows_to_delete AS (
SELECT ctid
FROM %[1]s
WHERE created_at < NOW() - $1::interval
AND status = $2
AND NOT workspace_id = ANY ($3)
LIMIT $4
)
DELETE FROM %[1]s
WHERE ctid IN (SELECT ctid FROM rows_to_delete);
`,
pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable))

_, err := a.db.ExecContext(
ctx,
sqlStatement,
fmt.Sprintf("%d DAY", a.config.uploadRetentionTimeInDays.Load()),
model.ExportedData,
pq.Array(skipWorkspaceIDs),
limit,
)
return err
}
62 changes: 61 additions & 1 deletion warehouse/archive/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestArchiver(t *testing.T) {
db := sqlmw.New(pgResource.DB)

archiver := archive.New(
config.New(),
c,
logger.NOP,
mockStats,
db,
Expand Down Expand Up @@ -214,6 +214,66 @@ func TestArchiver(t *testing.T) {
}
}

func TestArchiver_Delete(t *testing.T) {
var pgResource *postgres.Resource

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pgResource, err = postgres.Setup(pool, t)
require.NoError(t, err)

t.Log("db:", pgResource.DBDsn)

err = (&migrator.Migrator{
Handle: pgResource.DB,
MigrationsTable: "wh_schema_migrations",
}).Migrate("warehouse")
require.NoError(t, err)

sqlStatement, err := os.ReadFile("testdata/dump.sql")
require.NoError(t, err)

_, err = pgResource.DB.Exec(string(sqlStatement))
require.NoError(t, err)

ctrl := gomock.NewController(t)
mockStats := mock_stats.NewMockStats(ctrl)
mockStats.EXPECT().NewStat(gomock.Any(), gomock.Any()).Times(1)

status := model.ExportedData
workspaceID := "1"
_, err = pgResource.DB.Exec(`
UPDATE wh_uploads SET workspace_id = $1, status = $2
`, workspaceID, status)
require.NoError(t, err)

c := config.New()
c.Set("Warehouse.uploadRetentionTimeInDays", 0)
tenantManager := multitenant.New(c, backendConfig.DefaultBackendConfig)

db := sqlmw.New(pgResource.DB)

archiver := archive.New(
c,
logger.NOP,
mockStats,
db,
filemanager.New,
tenantManager,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = archiver.Delete(ctx)
require.NoError(t, err)

var count int
err = pgResource.DB.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM %q`, warehouseutils.WarehouseUploadsTable)).Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count, "wh_uploads rows should be deleted")
cisse21 marked this conversation as resolved.
Show resolved Hide resolved
}

func jsonTestData(t require.TestingT, file string, value any) {
f, err := os.Open(file)
require.NoError(t, err)
Expand Down
10 changes: 9 additions & 1 deletion warehouse/archive/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package archive
import (
"context"
"time"

"github.com/rudderlabs/rudder-go-kit/logger"
)

func CronArchiver(ctx context.Context, a *Archiver) {
Expand All @@ -15,7 +17,13 @@ func CronArchiver(ctx context.Context, a *Archiver) {
if a.config.archiveUploadRelatedRecords.Load() {
err := a.Do(ctx)
if err != nil {
a.log.Errorf(`Error archiving uploads: %v`, err)
a.log.Errorn(`Error archiving uploads`, logger.NewErrorField(err))
}
}
if a.config.canDeleteUploads.Load() {
err := a.Delete(ctx)
if err != nil {
a.log.Errorn(`Error deleting uploads`, logger.NewErrorField(err))
cisse21 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Loading