Skip to content

Commit

Permalink
chore: cleanup warehouse (#5150)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Oct 7, 2024
1 parent 87283c4 commit 7818610
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 174 deletions.
4 changes: 2 additions & 2 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ func (a *App) onConfigDataEvent(
enabledDestinations := make(map[string]bool)
diffRouters := make(map[string]*router.Router)
for _, wConfig := range configMap {
for _, source := range wConfig.Sources {
for _, destination := range source.Destinations {
for _, sConfig := range wConfig.Sources {
for _, destination := range sConfig.Destinations {
enabledDestinations[destination.DestinationDefinition.Name] = true

if !slices.Contains(warehouseutils.WarehouseDestinations, destination.DestinationDefinition.Name) {
Expand Down
36 changes: 14 additions & 22 deletions warehouse/integrations/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand All @@ -575,7 +575,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand All @@ -597,10 +597,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(
t, loadFiles, tableName, schemaInUpload, schemaInWarehouse,
whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z",
)
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -648,7 +645,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, true, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, true)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -698,7 +695,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false, "2022-11-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config["preferAppend"] = false
Expand Down Expand Up @@ -749,7 +746,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/dedup.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false, "2022-11-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config["preferAppend"] = true
Expand Down Expand Up @@ -802,7 +799,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, true, false)

appendWarehouse := th.Clone(t, warehouse)
appendWarehouse.Destination.Config[model.PreferAppendSetting.String()] = true
Expand Down Expand Up @@ -857,7 +854,7 @@ func TestIntegration(t *testing.T) {
credentials.ContainerName,
),
}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand All @@ -882,7 +879,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-columns.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -926,7 +923,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/mismatch-schema.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -970,7 +967,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/discards.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, whutils.DiscardsSchema, whutils.DiscardsSchema, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, whutils.DiscardsSchema, whutils.DiscardsSchema, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -1014,7 +1011,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.parquet", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeParquet, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeParquet, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -1059,7 +1056,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -1120,7 +1117,7 @@ func TestIntegration(t *testing.T) {
uploadOutput := whth.UploadLoadFile(t, fm, "../testdata/load.csv.gz", tableName)

loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false, "2022-12-15T06:53:49.640Z")
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse, whutils.LoadFileTypeCsv, false, false)

d := deltalake.New(config.New(), logger.NOP, stats.NOP)
err := d.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -1431,14 +1428,10 @@ func newMockUploader(
loadFileType string,
canAppend bool,
onDedupUseNewRecords bool,
eventTS string,
) whutils.Uploader {
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

firstLastEventTS, err := time.Parse(time.RFC3339, eventTS)
require.NoError(t, err)

mockUploader := mockuploader.NewMockUploader(ctrl)
mockUploader.EXPECT().UseRudderStorage().Return(false).AnyTimes()
mockUploader.EXPECT().ShouldOnDedupUseNewRecord().Return(onDedupUseNewRecords).AnyTimes()
Expand All @@ -1452,7 +1445,6 @@ func newMockUploader(
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetLoadFileType().Return(loadFileType).AnyTimes()
mockUploader.EXPECT().GetFirstLastEvent().Return(firstLastEventTS, firstLastEventTS).AnyTimes()

return mockUploader
}
30 changes: 0 additions & 30 deletions warehouse/internal/mocks/utils/mock_uploader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 0 additions & 13 deletions warehouse/internal/model/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,6 @@ func GetLastFailedStatus(timingsMap Timings) (status string) {
return // zero values
}

func GetLoadFileGenTime(timingsMap Timings) (t time.Time) {
if len(timingsMap) > 0 {
for index := len(timingsMap) - 1; index >= 0; index-- {
for s, t := range timingsMap[index] {
if strings.Contains(s, GeneratingLoadFiles) {
return t
}
}
}
}
return // zero values
}

type AlterTableResponse struct {
IsDependent bool // true if the column is dependent on another view or rules, false otherwise
Query string
Expand Down
32 changes: 0 additions & 32 deletions warehouse/internal/model/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,12 @@ package model_test
import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
)

func TestGetLoadFileGenTime(t *testing.T) {
inputs := []struct {
timingsRaw string
loadFilesEpochStr string
}{
{
timingsRaw: "[{\"generating_upload_schema\":\"2022-07-04T16:09:03.001Z\"},{\"generated_upload_schema\":\"2022-07-04T16:09:04.141Z\"},{\"creating_table_uploads\":\"2022-07-04T16:09:04.144Z\"},{\"created_table_uploads\":\"2022-07-04T16:09:04.164Z\"},{\"generating_load_files\":\"2022-07-04T16:09:04.169Z\"},{\"generated_load_files\":\"2022-07-04T16:09:40.957Z\"},{\"updating_table_uploads_counts\":\"2022-07-04T16:09:40.959Z\"},{\"updated_table_uploads_counts\":\"2022-07-04T16:09:41.916Z\"},{\"creating_remote_schema\":\"2022-07-04T16:09:41.918Z\"},{\"created_remote_schema\":\"2022-07-04T16:09:41.920Z\"},{\"exporting_data\":\"2022-07-04T16:09:41.922Z\"},{\"exporting_data_failed\":\"2022-07-04T17:14:24.424Z\"}]",
loadFilesEpochStr: "2022-07-04T16:09:04.169Z",
},
{
timingsRaw: "[]",
loadFilesEpochStr: "0001-01-01T00:00:00.000Z",
},
{
timingsRaw: "[{\"generating_upload_schema\":\"2022-07-04T16:09:03.001Z\"},{\"generated_upload_schema\":\"2022-07-04T16:09:04.141Z\"},{\"creating_table_uploads\":\"2022-07-04T16:09:04.144Z\"},{\"created_table_uploads\":\"2022-07-04T16:09:04.164Z\"}]",
loadFilesEpochStr: "0001-01-01T00:00:00.000Z",
},
}
for _, input := range inputs {
loadFilesEpochTime, err := time.Parse(misc.RFC3339Milli, input.loadFilesEpochStr)
require.NoError(t, err)

var timing model.Timings
require.NoError(t, json.Unmarshal([]byte(input.timingsRaw), &timing))

loadFileGenTime := model.GetLoadFileGenTime(timing)
require.Equal(t, loadFilesEpochTime, loadFileGenTime)
}
}

func TestGetLastFailedStatus(t *testing.T) {
inputs := []struct {
timingsRaw string
Expand Down
7 changes: 4 additions & 3 deletions warehouse/multitenant/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/rudderlabs/rudder-go-kit/config"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
)

Expand Down Expand Up @@ -49,9 +50,9 @@ func (m *Manager) Run(ctx context.Context) {
chIn := m.backendConfig.Subscribe(ctx, backendconfig.TopicBackendConfig)
for data := range chIn {
m.sourceMu.Lock()
config := data.Data.(map[string]backendconfig.ConfigT)
for workspaceID := range config {
for _, source := range config[workspaceID].Sources {
wConfig := data.Data.(map[string]backendconfig.ConfigT)
for workspaceID := range wConfig {
for _, source := range wConfig[workspaceID].Sources {
m.sourceIDToWorkspaceID[source.ID] = workspaceID
}
}
Expand Down
2 changes: 1 addition & 1 deletion warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (r *Router) mainLoop(ctx context.Context) {
}

func (r *Router) createJobs(ctx context.Context, warehouse model.Warehouse) (err error) {
if ok, err := r.canCreateUpload(ctx, warehouse); !ok {
if err := r.canCreateUpload(ctx, warehouse); err != nil {
r.statsFactory.NewTaggedStat("wh_scheduler.upload_sync_skipped", stats.CountType, stats.Tags{
"workspaceId": warehouse.WorkspaceID,
"destinationID": warehouse.Destination.ID,
Expand Down
28 changes: 17 additions & 11 deletions warehouse/router/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,64 @@ import (
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
errUploadFrequencyExceeded = fmt.Errorf("upload frequency exceeded")
errCurrentTimeExistsInExcludeWindow = fmt.Errorf("current time exists in exclude window")
errBeforeScheduledTime = fmt.Errorf("before scheduled time")
)

type createUploadAlwaysLoader interface {
Load() bool
}

// canCreateUpload indicates if an upload can be started now for the warehouse based on its configured schedule
func (r *Router) canCreateUpload(ctx context.Context, warehouse model.Warehouse) (bool, error) {
func (r *Router) canCreateUpload(ctx context.Context, warehouse model.Warehouse) error {
// can be set from rudder-cli to force uploads always
if r.createUploadAlways.Load() {
return true, nil
return nil
}

// return true if the upload was triggered
if _, isTriggered := r.triggerStore.Load(warehouse.Identifier); isTriggered {
return true, nil
return nil
}

if r.config.warehouseSyncFreqIgnore.Load() {
if r.uploadFrequencyExceeded(warehouse, "") {
return true, nil
return nil
}
return false, fmt.Errorf("ignore sync freq: upload frequency exceeded")
return errUploadFrequencyExceeded
}

// gets exclude window start time and end time
excludeWindow := warehouse.GetMapDestinationConfig(model.ExcludeWindowSetting)
excludeWindowStartTime, excludeWindowEndTime := excludeWindowStartEndTimes(excludeWindow)

if checkCurrentTimeExistsInExcludeWindow(r.now().UTC(), excludeWindowStartTime, excludeWindowEndTime) {
return false, fmt.Errorf("exclude window: current time exists in exclude window")
return errCurrentTimeExistsInExcludeWindow
}

syncFrequency := warehouse.GetStringDestinationConfig(r.conf, model.SyncFrequencySetting)
syncStartAt := warehouse.GetStringDestinationConfig(r.conf, model.SyncStartAtSetting)
if syncFrequency == "" || syncStartAt == "" {
if r.uploadFrequencyExceeded(warehouse, syncFrequency) {
return true, nil
return nil
}
return false, fmt.Errorf("upload frequency exceeded")
return errUploadFrequencyExceeded
}

prevScheduledTime := r.prevScheduledTime(syncFrequency, syncStartAt, r.now())
lastUploadCreatedAt, err := r.uploadRepo.LastCreatedAt(ctx, warehouse.Source.ID, warehouse.Destination.ID)
if err != nil {
return false, err
return err
}

// start upload only if no upload has started in current window
// e.g. with prev scheduled time 14:00 and current time 15:00, start only if prev upload hasn't started after 14:00
if lastUploadCreatedAt.Before(prevScheduledTime) {
return true, nil
return nil
}
return false, fmt.Errorf("before scheduled time")
return errBeforeScheduledTime
}

func excludeWindowStartEndTimes(excludeWindow map[string]interface{}) (string, string) {
Expand Down
Loading

0 comments on commit 7818610

Please sign in to comment.