Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: use GCSUploadIfNotExist setting for file manager #4716

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions archiver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"path"
"time"

"github.com/google/uuid"

"github.com/samber/lo"
"github.com/tidwall/gjson"

Expand Down Expand Up @@ -155,7 +153,7 @@ func (w *worker) uploadJobs(ctx context.Context, jobs []*jobsdb.JobT) (string, e
lo.Must(misc.CreateTMPDIR()),
"rudder-backups",
w.sourceID,
fmt.Sprintf("%d_%d_%s_%s.json.gz", firstJobCreatedAt.Unix(), lastJobCreatedAt.Unix(), workspaceID, uuid.NewString()),
fmt.Sprintf("%d_%d_%s.json.gz", firstJobCreatedAt.Unix(), lastJobCreatedAt.Unix(), workspaceID),
)

for _, job := range jobs {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ require (
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/bing-ads-go-sdk v0.2.1
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.32.1
github.com/rudderlabs/rudder-go-kit v0.32.2
github.com/rudderlabs/rudder-observability-kit v0.0.3
github.com/rudderlabs/rudder-schemas v0.4.0
github.com/rudderlabs/sql-tunnels v0.1.6
Expand Down
15 changes: 13 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fsouza/fake-gcs-server v1.49.0 h1:4x1RxKuqoqhZrXogtj5nInQnIjQylxld43tKrkPHnmE=
github.com/fsouza/fake-gcs-server v1.49.0/go.mod h1:FJYZxdHQk2nGxrczFjLbDv8h6SnYXxSxcnM14eeespA=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down Expand Up @@ -610,7 +612,10 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 h1:k7nVchz72niMH6YLQNvHSdIE7iqsQxK1P41mySCvssg=
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qAhxg=
github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
Expand All @@ -632,7 +637,11 @@ github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0
github.com/googleapis/gax-go/v2 v2.2.0/go.mod h1:as02EH8zWkzwUoLbBaFeQ+arQaj/OthfcblKl4IGNaM=
github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg=
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
Expand Down Expand Up @@ -913,6 +922,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo=
github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk=
github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE=
github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -960,8 +971,8 @@ github.com/rudderlabs/compose-test v0.1.3 h1:uyep6jDCIF737sfv4zIaMsKRQKX95IDz5Xb
github.com/rudderlabs/compose-test v0.1.3/go.mod h1:tuvS1eQdSfwOYv1qwyVAcpdJxPLQXJgy5xGDd/9XmMg=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.32.1 h1:jshXstlMrsZdYZlxKcfVDz+fIYw91eYMFALPpy1jzLg=
github.com/rudderlabs/rudder-go-kit v0.32.1/go.mod h1:Ywgp3dgbN+EUe813J4i9dVZ+Px50ER95QH+yK3iSDM4=
github.com/rudderlabs/rudder-go-kit v0.32.2 h1:BPSxudYrSg83rv/ebDs9bC9E8dPpy7BCNCvJomZI+bw=
github.com/rudderlabs/rudder-go-kit v0.32.2/go.mod h1:U9NpJmwTAeMIba5MlwnX8ckMr/Uzh7EXTEaXN3RhJh8=
github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2YYP6QlmAcVcgecTOjvz+Q=
github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8=
github.com/rudderlabs/rudder-schemas v0.4.0 h1:CkpmO8NI0OFMcFyD6WAGN/gIsmqyrC+Bd/htb9eBhpo=
Expand Down
2 changes: 1 addition & 1 deletion processor/stash/stash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {

errJobs = st.storeErrorsToObjectStorage(jobsToFail)
require.Equal(t, 1, len(errJobs))
require.Equal(t, errJobs[0].errorOutput.Error, fileuploader.NoStorageForWorkspaceError)
require.Equal(t, errJobs[0].errorOutput.Error, fileuploader.ErrNoStorageForWorkspace)
}

func countJobsByWorkspace(jobs []*jobsdb.JobT) map[string]int {
Expand Down
39 changes: 30 additions & 9 deletions services/fileuploader/fileuploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import (
"context"
"errors"
"fmt"
"os"
"sync"

"github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -16,12 +18,28 @@
Preferences backendconfig.StoragePreferences
}

var NoStorageForWorkspaceError = fmt.Errorf("no storage settings found for workspace")
type FileManager struct {
filemanager.FileManager
}

func (f *FileManager) Upload(ctx context.Context, file *os.File, prefixes ...string) (filemanager.UploadedFile, error) {
uploadedFile, err := f.FileManager.Upload(ctx, file, prefixes...)
switch {
case err == nil:
return uploadedFile, nil
case errors.Is(err, filemanager.ErrPreConditionFailed):
return filemanager.UploadedFile{Location: "already exists, not uploaded"}, nil
default:
return filemanager.UploadedFile{}, err

Check warning on line 33 in services/fileuploader/fileuploader.go

View check run for this annotation

Codecov / codecov/patch

services/fileuploader/fileuploader.go#L30-L33

Added lines #L30 - L33 were not covered by tests
}
}

var ErrNoStorageForWorkspace = fmt.Errorf("no storage settings found for workspace")

// Provider is an interface that provides file managers and storage preferences for a given workspace.
type Provider interface {
// Gets a file manager for the given workspace.
GetFileManager(workspaceID string) (filemanager.FileManager, error)
GetFileManager(workspaceID string) (*FileManager, error)
// Gets the storage preferences for the given workspace.
GetStoragePreferences(workspaceID string) (backendconfig.StoragePreferences, error)
}
Expand Down Expand Up @@ -67,20 +85,22 @@
defer p.mu.RUnlock()
settings, ok := p.storageSettings[workspaceID]
if !ok {
return StorageSettings{}, NoStorageForWorkspaceError
return StorageSettings{}, ErrNoStorageForWorkspace
}
return settings, nil
}

func (p *provider) GetFileManager(workspaceID string) (filemanager.FileManager, error) {
func (p *provider) GetFileManager(workspaceID string) (*FileManager, error) {
settings, err := p.getStorageSettings(workspaceID)
if err != nil {
return nil, err
}
return filemanager.New(&filemanager.Settings{
Provider: settings.Bucket.Type,
Config: settings.Bucket.Config,
fm, err := filemanager.New(&filemanager.Settings{
Provider: settings.Bucket.Type,
Config: settings.Bucket.Config,
GCSUploadIfNotExist: true,
})
return &FileManager{fm}, err
}

func (p *provider) GetStoragePreferences(workspaceID string) (backendconfig.StoragePreferences, error) {
Expand Down Expand Up @@ -141,12 +161,13 @@

type defaultProvider struct{}

func (*defaultProvider) GetFileManager(_ string) (filemanager.FileManager, error) {
func (*defaultProvider) GetFileManager(_ string) (*FileManager, error) {
defaultConfig := getDefaultBucket(context.Background(), config.GetString("JOBS_BACKUP_STORAGE_PROVIDER", "S3"))
return filemanager.New(&filemanager.Settings{
fm, err := filemanager.New(&filemanager.Settings{
Provider: defaultConfig.Type,
Config: defaultConfig.Config,
})
return &FileManager{fm}, err
}

func (*defaultProvider) GetStoragePreferences(_ string) (backendconfig.StoragePreferences, error) {
Expand Down
4 changes: 2 additions & 2 deletions services/fileuploader/fileuploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestFileUploaderUpdatingWithConfigBackend(t *testing.T) {
Expect(fm3.Prefix()).To(Equal("defaultPrefixWithStorageTTL"))

fm0, err := fileUploaderProvider.GetFileManager("testWorkspaceId-0")
Expect(err).To(Equal(NoStorageForWorkspaceError))
Expect(err).To(Equal(ErrNoStorageForWorkspace))
Expect(fm0).To(BeNil())

storageSettings.Wait()
Expand All @@ -136,7 +136,7 @@ func TestFileUploaderUpdatingWithConfigBackend(t *testing.T) {
))

preferences, err = fileUploaderProvider.GetStoragePreferences("testWorkspaceId-0")
Expect(err).To(Equal(NoStorageForWorkspaceError))
Expect(err).To(Equal(ErrNoStorageForWorkspace))
Expect(preferences).To(BeEquivalentTo(backendconfig.StoragePreferences{}))

preferences, err = fileUploaderProvider.GetStoragePreferences("testWorkspaceId-2")
Expand Down
Loading