From b90df06c90b5c9eae0f44abe1855c1bb7b1da8ca Mon Sep 17 00:00:00 2001 From: Rohith BCS Date: Tue, 3 Sep 2024 14:44:23 +0530 Subject: [PATCH] chore: delete load files post successful upload --- .../warehouse/integration_test.go | 127 ------------------ warehouse/router/upload.go | 21 +-- 2 files changed, 11 insertions(+), 137 deletions(-) diff --git a/integration_test/warehouse/integration_test.go b/integration_test/warehouse/integration_test.go index e015a7ac0c..6c4f90a594 100644 --- a/integration_test/warehouse/integration_test.go +++ b/integration_test/warehouse/integration_test.go @@ -109,10 +109,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -174,10 +170,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -231,10 +223,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -283,10 +271,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -412,10 +396,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -526,10 +506,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -602,10 +578,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -803,10 +775,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events+(events/2), []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events+(events/2), []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -929,10 +897,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -954,10 +918,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1018,10 +978,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1043,10 +999,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1107,10 +1059,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1132,10 +1080,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1196,10 +1140,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1221,10 +1161,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1288,10 +1224,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1313,10 +1245,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1380,10 +1308,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1405,10 +1329,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1472,10 +1392,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1497,10 +1413,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*2, []lo.Tuple2[string, any]{ {A: "status", B: exportedData}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1574,10 +1486,6 @@ func TestUploads(t *testing.T) { {A: "destination_id", B: destinationID}, {A: "status", B: succeeded}, }...) - requireLoadFileEventsCount(t, ctx, db, events*3, []lo.Tuple2[string, any]{ - {A: "source_id", B: sourceID}, - {A: "destination_id", B: destinationID}, - }...) requireTableUploadEventsCount(t, ctx, db, events*3, []lo.Tuple2[string, any]{ {A: "status", B: waiting}, {A: "wh_uploads.source_id", B: sourceID}, @@ -1787,41 +1695,6 @@ func requireStagingFileEventsCount( ) } -// nolint:unparam -func requireLoadFileEventsCount( - t testing.TB, - ctx context.Context, - db *sqlmw.DB, - expectedCount int, - filters ...lo.Tuple2[string, any], -) { - t.Helper() - - query := "SELECT COALESCE(sum(total_events), 0) FROM wh_load_files WHERE 1 = 1" - query += strings.Join(lo.Map(filters, func(t lo.Tuple2[string, any], index int) string { - return fmt.Sprintf(" AND %s = $%d", t.A, index+1) - }), "") - queryArgs := lo.Map(filters, func(t lo.Tuple2[string, any], _ int) any { - return t.B - }) - - require.Eventuallyf(t, - func() bool { - var eventsCount int - err := db.QueryRowContext(ctx, query, queryArgs...).Scan(&eventsCount) - if err != nil { - t.Logf("error getting load file events count: %v", err) - return false - } - t.Logf("Load file events count: %d", eventsCount) - return eventsCount == expectedCount - }, - 10*time.Second, - 250*time.Millisecond, - "expected load file events count to be %d", expectedCount, - ) -} - // nolint:unparam func requireTableUploadEventsCount( t testing.TB, diff --git a/warehouse/router/upload.go b/warehouse/router/upload.go index b8558985ac..683c096605 100644 --- a/warehouse/router/upload.go +++ b/warehouse/router/upload.go @@ -10,6 +10,8 @@ import ( "sync" "time" + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" + "github.com/cenkalti/backoff/v4" "github.com/rudderlabs/rudder-go-kit/config" @@ -287,16 +289,14 @@ func (job *UploadJob) run() (err error) { defer whManager.Cleanup(job.ctx) if err = job.recovery.Recover(job.ctx, whManager, job.warehouse); err != nil { - job.logger.Warnw("Error during recovery (dangling staging table cleanup)", - logfield.DestinationID, job.warehouse.Destination.ID, - logfield.DestinationType, job.warehouse.Destination.DestinationDefinition.Name, - logfield.SourceID, job.warehouse.Source.ID, - logfield.SourceType, job.warehouse.Source.SourceDefinition.Name, - logfield.DestinationID, job.warehouse.Destination.ID, - logfield.DestinationType, job.warehouse.Destination.DestinationDefinition.Name, - logfield.WorkspaceID, job.warehouse.WorkspaceID, - logfield.Namespace, job.warehouse.Namespace, - logfield.Error, err.Error(), + job.logger.Warnn("Error during recovery (dangling staging table cleanup)", + obskit.DestinationID(job.warehouse.Destination.ID), + obskit.DestinationType(job.warehouse.Destination.DestinationDefinition.Name), + obskit.WorkspaceID(job.warehouse.WorkspaceID), + obskit.Namespace(job.warehouse.Namespace), + obskit.Error(err), + obskit.SourceID(job.warehouse.Source.ID), + obskit.SourceType(job.warehouse.Source.SourceDefinition.Name), ) _, _ = job.setUploadError(err, InternalProcessingFailed) return err @@ -424,6 +424,7 @@ func (job *UploadJob) run() (err error) { job.timerStat(nextUploadState.inProgress).SendTiming(time.Since(stateStartTime)) if newStatus == model.ExportedData { + _ = job.loadFilesRepo.DeleteByStagingFiles(job.ctx, job.stagingFileIDs) break }