From 598a4c1a88e9ae6eefc8a49bd9ebb9f3d4414638 Mon Sep 17 00:00:00 2001 From: Alan Kutniewski Date: Mon, 22 Jul 2024 11:46:30 +0200 Subject: [PATCH 1/7] Add support for blob type sources. Signed-off-by: Alan Kutniewski --- core/go.mod | 5 + core/go.sum | 9 ++ core/pkg/sync/blob/blob_sync.go | 144 +++++++++++++++++++++++++++ core/pkg/sync/blob/blob_sync_test.go | 127 +++++++++++++++++++++++ core/pkg/sync/blob/mock_blob.go | 73 ++++++++++++++ core/pkg/sync/http/http_sync_test.go | 3 +- core/pkg/sync/http/mock/http.go | 61 ------------ core/pkg/sync/testing/mock_cron.go | 74 ++++++++++++++ 8 files changed, 434 insertions(+), 62 deletions(-) create mode 100644 core/pkg/sync/blob/blob_sync.go create mode 100644 core/pkg/sync/blob/blob_sync_test.go create mode 100644 core/pkg/sync/blob/mock_blob.go create mode 100644 core/pkg/sync/testing/mock_cron.go diff --git a/core/go.mod b/core/go.mod index 5fb62a9fe..887a97c9a 100644 --- a/core/go.mod +++ b/core/go.mod @@ -64,6 +64,7 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/gax-go/v2 v2.12.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/huandu/xstrings v1.4.0 // indirect github.com/imdario/mergo v0.3.16 // indirect @@ -82,15 +83,19 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect + gocloud.dev v0.37.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect + google.golang.org/api v0.169.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/core/go.sum b/core/go.sum index aad35ef19..598c2a025 100644 --- a/core/go.sum +++ b/core/go.sum @@ -1404,6 +1404,8 @@ github.com/googleapis/gax-go/v2 v2.8.0/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38 github.com/googleapis/gax-go/v2 v2.10.0/go.mod h1:4UOEnMCrxsSqQ940WnTiD6qJ63le2ev3xfyagutxiPw= github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/googleapis/gax-go/v2 v2.12.2 h1:mhN09QQW1jEWeMF74zGR81R30z4VJzjZsfkUhuHF+DA= +github.com/googleapis/gax-go/v2 v2.12.2/go.mod h1:61M8vcyyXR2kqKFxKrfA22jaA8JGF7Dc8App1U3H6jc= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= @@ -1572,6 +1574,7 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= @@ -1642,6 +1645,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +gocloud.dev v0.37.0 h1:XF1rN6R0qZI/9DYjN16Uy0durAmSlf58DHOcb28GPro= +gocloud.dev v0.37.0/go.mod h1:7/O4kqdInCNsc6LqgmuFnS0GRew4XNNYWpA44yQnwco= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -2086,6 +2091,8 @@ golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gomodules.xyz/jsonpatch/v2 v2.4.0 h1:Ci3iUJyx9UeRx7CeFN8ARgGbkESwJK+KB9lLcWxY/Zw= gomodules.xyz/jsonpatch/v2 v2.4.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= @@ -2161,6 +2168,8 @@ google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvy google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= google.golang.org/api v0.139.0/go.mod h1:CVagp6Eekz9CjGZ718Z+sloknzkDJE7Vc1Ckj9+viBk= google.golang.org/api v0.149.0/go.mod h1:Mwn1B7JTXrzXtnvmzQE2BD6bYZQ8DShKZDZbeN9I7qI= +google.golang.org/api v0.169.0 h1:QwWPy71FgMWqJN/l6jVlFHUa29a7dcUy02I8o799nPY= +google.golang.org/api v0.169.0/go.mod h1:gpNOiMA2tZ4mf5R9Iwf4rK/Dcz0fbdIgWYWVoxmsyLg= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/core/pkg/sync/blob/blob_sync.go b/core/pkg/sync/blob/blob_sync.go new file mode 100644 index 000000000..ed53e8be7 --- /dev/null +++ b/core/pkg/sync/blob/blob_sync.go @@ -0,0 +1,144 @@ +package blob + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "time" + + "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/sync" + "gocloud.dev/blob" + //nolint:gosec +) + +type Sync struct { + Bucket string + Object string + BlobURLMux *blob.URLMux + Cron Cron + Logger *logger.Logger + Interval uint32 + ready bool + lastUpdated time.Time +} + +// Cron defines the behaviour required of a cron +type Cron interface { + AddFunc(spec string, cmd func()) error + Start() + Stop() +} + +func (hs *Sync) Init(ctx context.Context) error { + return nil +} + +func (hs *Sync) IsReady() bool { + return hs.ready +} + +func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %d", hs.Bucket, hs.Object, hs.Interval)) + // Initial fetch + hs.Logger.Debug(fmt.Sprintf("initial sync of the %s/%s", hs.Bucket, hs.Object)) + err := hs.ReSync(ctx, dataSync) + if err != nil { + return err + } + hs.ready = true + + hs.Logger.Debug(fmt.Sprintf("polling %s/%s every %d seconds", hs.Bucket, hs.Object, hs.Interval)) + _ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() { + hs.Logger.Debug(fmt.Sprintf("fetching configuration from %s/%s", hs.Bucket, hs.Object)) + bucket, err := hs.getBucket(ctx) + if err != nil { + hs.Logger.Warn(fmt.Sprintf("couldn't get bucket: %v", err)) + return + } + defer bucket.Close() + updated, err := hs.fetchObjectModificationTime(ctx, bucket) + if err != nil { + hs.Logger.Warn(fmt.Sprintf("couldn't get object attributes: %v", err)) + return + } + if hs.lastUpdated == updated { + hs.Logger.Debug("configuration hasn't changed, skipping fetching full object") + return + } + msg, err := hs.fetchObject(ctx, bucket) + if err != nil { + hs.Logger.Warn(fmt.Sprintf("couldn't get object: %v", err)) + return + } + hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg)) + dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL} + hs.lastUpdated = updated + }) + + hs.Cron.Start() + + <-ctx.Done() + hs.Cron.Stop() + + return nil +} + +func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { + bucket, err := hs.getBucket(ctx) + if err != nil { + return err + } + defer bucket.Close() + updated, err := hs.fetchObjectModificationTime(ctx, bucket) + if err != nil { + return err + } + msg, err := hs.fetchObject(ctx, bucket) + if err != nil { + return err + } + hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg)) + dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL} + hs.lastUpdated = updated + return nil +} + +func (hs *Sync) getBucket(ctx context.Context) (*blob.Bucket, error) { + if hs.Bucket == "" { + return nil, errors.New("no bucket string set") + } + return hs.BlobURLMux.OpenBucket(ctx, hs.Bucket) +} + +func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bucket) (time.Time, error) { + if hs.Object == "" { + return time.Time{}, errors.New("no object string set") + } + attrs, err := bucket.Attributes(ctx, hs.Object) + if err != nil { + return time.Time{}, fmt.Errorf("error fetching attributes for object %s/%s: %w", hs.Bucket, hs.Object, err) + } + return attrs.ModTime, nil +} + +func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, error) { + if hs.Object == "" { + return "", errors.New("no object string set") + } + r, err := bucket.NewReader(ctx, hs.Object, nil) + if err != nil { + return "", fmt.Errorf("error creating reader for object %s/%s: %w", hs.Bucket, hs.Object, err) + } + defer r.Close() + + buf := bytes.NewBuffer(nil) + _, err = io.Copy(buf, r) + if err != nil { + return "", fmt.Errorf("error reading object %s/%s: %w", hs.Bucket, hs.Object, err) + } + + return string(buf.Bytes()), nil +} diff --git a/core/pkg/sync/blob/blob_sync_test.go b/core/pkg/sync/blob/blob_sync_test.go new file mode 100644 index 000000000..8af9c18c3 --- /dev/null +++ b/core/pkg/sync/blob/blob_sync_test.go @@ -0,0 +1,127 @@ +package blob + +import ( + "context" + "log" + "testing" + "time" + + "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/sync" + synctesting "github.com/open-feature/flagd/core/pkg/sync/testing" + "go.uber.org/mock/gomock" +) + +const ( + scheme = "xyz" + bucket = "b" + object = "o" +) + +func TestSync(t *testing.T) { + ctrl := gomock.NewController(t) + mockCron := synctesting.NewMockCron(ctrl) + mockCron.EXPECT().AddFunc(gomock.Any(), gomock.Any()).DoAndReturn(func(spec string, cmd func()) error { + return nil + }) + mockCron.EXPECT().Start().Times(1) + + blobSync := &Sync{ + Bucket: scheme + "://" + bucket, + Object: object, + Cron: mockCron, + Logger: logger.NewLogger(nil, false), + } + blobMock := NewMockBlob(scheme, func() *Sync { + return blobSync + }) + blobSync.BlobURLMux = blobMock.URLMux() + + ctx := context.Background() + dataSyncChan := make(chan sync.DataSync, 1) + + config := "my-config" + blobMock.AddObject(object, config) + + go func() { + err := blobSync.Sync(ctx, dataSyncChan) + if err != nil { + log.Fatalf("Error start sync: %s", err.Error()) + return + } + }() + + data := <-dataSyncChan // initial sync + if data.FlagData != config { + t.Errorf("expected content: %s, but received content: %s", config, data.FlagData) + } + tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, "new config") + tickWithoutConfigChange(t, mockCron, dataSyncChan) + tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, "new config 2") + tickWithoutConfigChange(t, mockCron, dataSyncChan) + tickWithoutConfigChange(t, mockCron, dataSyncChan) +} + +func tickWithConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync, blobMock *MockBlob, newConfig string) { + time.Sleep(time.Millisecond) // sleep so the new file has different modification date + blobMock.AddObject(object, newConfig) + mockCron.Tick() + select { + case data, ok := <-dataSyncChan: + if ok { + if data.FlagData != newConfig { + t.Errorf("expected content: %s, but received content: %s", newConfig, data.FlagData) + } + } else { + t.Errorf("data channel unexpecdly closed") + } + default: + t.Errorf("data channel has no expected update") + } +} + +func tickWithoutConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync) { + mockCron.Tick() + select { + case data, ok := <-dataSyncChan: + if ok { + t.Errorf("unexpected update: %s", data.FlagData) + } else { + t.Errorf("data channel unexpecdly closed") + } + default: + } +} + +func TestReSync(t *testing.T) { + ctrl := gomock.NewController(t) + mockCron := synctesting.NewMockCron(ctrl) + + blobSync := &Sync{ + Bucket: scheme + "://" + bucket, + Object: object, + Cron: mockCron, + Logger: logger.NewLogger(nil, false), + } + blobMock := NewMockBlob(scheme, func() *Sync { + return blobSync + }) + blobSync.BlobURLMux = blobMock.URLMux() + + ctx := context.Background() + dataSyncChan := make(chan sync.DataSync, 1) + + config := "my-config" + blobMock.AddObject(object, config) + + err := blobSync.ReSync(ctx, dataSyncChan) + if err != nil { + log.Fatalf("Error start sync: %s", err.Error()) + return + } + + data := <-dataSyncChan + if data.FlagData != config { + t.Errorf("expected content: %s, but received content: %s", config, data.FlagData) + } +} diff --git a/core/pkg/sync/blob/mock_blob.go b/core/pkg/sync/blob/mock_blob.go new file mode 100644 index 000000000..c0b4ea0b7 --- /dev/null +++ b/core/pkg/sync/blob/mock_blob.go @@ -0,0 +1,73 @@ +package blob + +import ( + "context" + "log" + "net/url" + + "gocloud.dev/blob" + "gocloud.dev/blob/memblob" +) + +type MockBlob struct { + mux *blob.URLMux + bucket *blob.Bucket + scheme string + opener *fakeOpener +} + +type fakeOpener struct { + object string + content string + keepModTime bool + getSync func() *Sync +} + +func (f *fakeOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) { + bucketUrl, err := url.Parse("mem://") + if err != nil { + log.Fatalf("couldn't parse url: %s: %v", "mem://", err) + } + opener := &memblob.URLOpener{} + bucket, err := opener.OpenBucketURL(context.Background(), bucketUrl) + if err != nil { + log.Fatalf("couldn't open in memory bucket: %v", err) + } + if f.object != "" { + err = bucket.WriteAll(ctx, f.object, []byte(f.content), nil) + if err != nil { + log.Fatalf("couldn't write in memory file: %v", err) + } + } + if f.keepModTime && f.object != "" { + attrs, err := bucket.Attributes(ctx, f.object) + if err != nil { + log.Fatalf("couldn't get memory file attributes: %v", err) + } + f.getSync().lastUpdated = attrs.ModTime + } else { + f.keepModTime = true + } + return bucket, nil +} + +func NewMockBlob(scheme string, getSync func() *Sync) *MockBlob { + mux := new(blob.URLMux) + opener := &fakeOpener{getSync: getSync} + mux.RegisterBucket(scheme, opener) + return &MockBlob{ + mux: mux, + scheme: scheme, + opener: opener, + } +} + +func (mb *MockBlob) URLMux() *blob.URLMux { + return mb.mux +} + +func (mb *MockBlob) AddObject(object, content string) { + mb.opener.object = object + mb.opener.content = content + mb.opener.keepModTime = false +} diff --git a/core/pkg/sync/http/http_sync_test.go b/core/pkg/sync/http/http_sync_test.go index 0563c11f8..5005a7f0f 100644 --- a/core/pkg/sync/http/http_sync_test.go +++ b/core/pkg/sync/http/http_sync_test.go @@ -13,6 +13,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/sync" syncmock "github.com/open-feature/flagd/core/pkg/sync/http/mock" + synctesting "github.com/open-feature/flagd/core/pkg/sync/testing" "go.uber.org/mock/gomock" ) @@ -20,7 +21,7 @@ func TestSimpleSync(t *testing.T) { ctrl := gomock.NewController(t) resp := "test response" - mockCron := syncmock.NewMockCron(ctrl) + mockCron := synctesting.NewMockCron(ctrl) mockCron.EXPECT().AddFunc(gomock.Any(), gomock.Any()).DoAndReturn(func(spec string, cmd func()) error { return nil }) diff --git a/core/pkg/sync/http/mock/http.go b/core/pkg/sync/http/mock/http.go index b5e5322c9..a6c4a5efe 100644 --- a/core/pkg/sync/http/mock/http.go +++ b/core/pkg/sync/http/mock/http.go @@ -53,64 +53,3 @@ func (mr *MockClientMockRecorder) Do(req any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Do", reflect.TypeOf((*MockClient)(nil).Do), req) } - -// MockCron is a mock of Cron interface. -type MockCron struct { - ctrl *gomock.Controller - recorder *MockCronMockRecorder -} - -// MockCronMockRecorder is the mock recorder for MockCron. -type MockCronMockRecorder struct { - mock *MockCron -} - -// NewMockCron creates a new mock instance. -func NewMockCron(ctrl *gomock.Controller) *MockCron { - mock := &MockCron{ctrl: ctrl} - mock.recorder = &MockCronMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockCron) EXPECT() *MockCronMockRecorder { - return m.recorder -} - -// AddFunc mocks base method. -func (m *MockCron) AddFunc(spec string, cmd func()) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddFunc", spec, cmd) - ret0, _ := ret[0].(error) - return ret0 -} - -// AddFunc indicates an expected call of AddFunc. -func (mr *MockCronMockRecorder) AddFunc(spec, cmd any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddFunc", reflect.TypeOf((*MockCron)(nil).AddFunc), spec, cmd) -} - -// Start mocks base method. -func (m *MockCron) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start. -func (mr *MockCronMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockCron)(nil).Start)) -} - -// Stop mocks base method. -func (m *MockCron) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MockCronMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCron)(nil).Stop)) -} diff --git a/core/pkg/sync/testing/mock_cron.go b/core/pkg/sync/testing/mock_cron.go new file mode 100644 index 000000000..4039208ee --- /dev/null +++ b/core/pkg/sync/testing/mock_cron.go @@ -0,0 +1,74 @@ +package testing + +import ( + "reflect" + + "go.uber.org/mock/gomock" +) + +// MockCron is a mock of Cron interface. +type MockCron struct { + ctrl *gomock.Controller + recorder *MockCronMockRecorder + cmd func() +} + +// MockCronMockRecorder is the mock recorder for MockCron. +type MockCronMockRecorder struct { + mock *MockCron +} + +// NewMockCron creates a new mock instance. +func NewMockCron(ctrl *gomock.Controller) *MockCron { + mock := &MockCron{ctrl: ctrl} + mock.recorder = &MockCronMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCron) EXPECT() *MockCronMockRecorder { + return m.recorder +} + +// AddFunc mocks base method. +func (m *MockCron) AddFunc(spec string, cmd func()) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddFunc", spec, cmd) + ret0, _ := ret[0].(error) + m.cmd = cmd + return ret0 +} + +func (m *MockCron) Tick() { + m.cmd() +} + +// AddFunc indicates an expected call of AddFunc. +func (mr *MockCronMockRecorder) AddFunc(spec, cmd any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddFunc", reflect.TypeOf((*MockCron)(nil).AddFunc), spec, cmd) +} + +// Start mocks base method. +func (m *MockCron) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockCronMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockCron)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockCron) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockCronMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCron)(nil).Stop)) +} From 0c7412adb0993cce66ba27b0929c94ecd888c947 Mon Sep 17 00:00:00 2001 From: Alan Kutniewski Date: Thu, 18 Jul 2024 10:07:57 +0200 Subject: [PATCH 2/7] Support for GCS source using Blob sync. Signed-off-by: Alan Kutniewski --- core/go.mod | 7 +++ core/go.sum | 21 ++++++++ core/pkg/sync/blob/blob_sync.go | 1 + core/pkg/sync/builder/syncbuilder.go | 36 ++++++++++++++ core/pkg/sync/builder/syncbuilder_test.go | 60 +++++++++++++++++++++++ core/pkg/sync/builder/utils.go | 7 ++- core/pkg/sync/builder/utils_test.go | 12 ++++- 7 files changed, 142 insertions(+), 2 deletions(-) diff --git a/core/go.mod b/core/go.mod index 887a97c9a..561d06f81 100644 --- a/core/go.mod +++ b/core/go.mod @@ -41,6 +41,10 @@ require ( ) require ( + cloud.google.com/go v0.112.1 // indirect + cloud.google.com/go/compute/metadata v0.3.0 // indirect + cloud.google.com/go/iam v1.1.6 // indirect + cloud.google.com/go/storage v1.39.1 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df // indirect @@ -63,7 +67,9 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/s2a-go v0.1.7 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/huandu/xstrings v1.4.0 // indirect @@ -96,6 +102,7 @@ require ( golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/api v0.169.0 // indirect + google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/core/go.sum b/core/go.sum index 598c2a025..52a82a72a 100644 --- a/core/go.sum +++ b/core/go.sum @@ -55,9 +55,11 @@ cloud.google.com/go v0.110.2/go.mod h1:k04UEeEtb6ZBRTv3dZz4CeJC3jKGxyhl0sAiVVqux cloud.google.com/go v0.110.4/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= cloud.google.com/go v0.110.6/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= cloud.google.com/go v0.110.7/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= +cloud.google.com/go v0.110.8 h1:tyNdfIxjzaWctIiLYOTalaLKZ17SI44SKFW26QbOhME= cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= cloud.google.com/go v0.110.9/go.mod h1:rpxevX/0Lqvlbc88b7Sc1SPNdyK1riNBTUU6JXhYNpM= cloud.google.com/go v0.110.10/go.mod h1:v1OoFqYxiBkUrruItNM3eT4lLByNjxmJSV/xDKJNnic= +cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4= cloud.google.com/go/accessapproval v1.4.0/go.mod h1:zybIuC3KpDOvotz59lFe5qxRZx6C75OtwbisN56xYB4= cloud.google.com/go/accessapproval v1.5.0/go.mod h1:HFy3tuiGvMdcd/u+Cu5b9NkO1pEICJ46IR82PoUdplw= cloud.google.com/go/accessapproval v1.6.0/go.mod h1:R0EiYnwV5fsRFiKZkPHr6mwyk2wxUJ30nL4j2pcFY2E= @@ -314,13 +316,17 @@ cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/ cloud.google.com/go/compute v1.20.1/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute v1.21.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= +cloud.google.com/go/compute v1.23.1 h1:V97tBoDaZHb6leicZ1G6DLK2BAaZLJ/7+9BB/En3hR0= cloud.google.com/go/compute v1.23.1/go.mod h1:CqB3xpmPKKt3OJpW2ndFIXnA9A4xAy/F3Xp1ixncW78= cloud.google.com/go/compute v1.23.2/go.mod h1:JJ0atRC0J/oWYiiVBmsSsrRnh92DhZPG4hFDcR04Rns= cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= +cloud.google.com/go/compute v1.25.0 h1:H1/4SqSUhjPFE7L5ddzHOfY2bCAvjwNRZPNl6Ni5oYU= cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZEXYonfTBHHFPO/4UU= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= +cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/contactcenterinsights v1.3.0/go.mod h1:Eu2oemoePuEFc/xKFPjbTuPSj0fYJcPls9TFlPNnHHY= cloud.google.com/go/contactcenterinsights v1.4.0/go.mod h1:L2YzkGbPsv+vMQMCADxJoT9YiTTnSEd6fEvCeHTYVck= cloud.google.com/go/contactcenterinsights v1.6.0/go.mod h1:IIDlT6CLcDoyv79kDv8iWxMSTZhLxSCofVV5W6YFM/w= @@ -600,9 +606,11 @@ cloud.google.com/go/iam v1.0.1/go.mod h1:yR3tmSL8BcZB4bxByRv2jkSIahVmCtfKZwLYGBa cloud.google.com/go/iam v1.1.0/go.mod h1:nxdHjaKfCr7fNYx/HJMM8LgiMugmveWlkatear5gVyk= cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= cloud.google.com/go/iam v1.1.2/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= +cloud.google.com/go/iam v1.1.3 h1:18tKG7DzydKWUnLjonWcJO6wjSCAtzh4GcRKlH/Hrzc= cloud.google.com/go/iam v1.1.3/go.mod h1:3khUlaBXfPKKe7huYgEpDn6FtgRyMEqbkvBxrQyY5SE= cloud.google.com/go/iam v1.1.4/go.mod h1:l/rg8l1AaA+VFMho/HYx2Vv6xinPSLMF8qfhRPIZ0L8= cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= +cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI= cloud.google.com/go/iap v1.4.0/go.mod h1:RGFwRJdihTINIe4wZ2iCP0zF/qu18ZwyKxrhMhygBEc= cloud.google.com/go/iap v1.5.0/go.mod h1:UH/CGgKd4KyohZL5Pt0jSKE4m3FR51qg6FKQ/z/Ix9A= cloud.google.com/go/iap v1.6.0/go.mod h1:NSuvI9C/j7UdjGjIde7t7HBz+QTwBcapPE07+sSRcLk= @@ -1016,6 +1024,10 @@ cloud.google.com/go/storage v1.27.0/go.mod h1:x9DOL8TK/ygDUMieqwfhdpQryTeEkhGKMi cloud.google.com/go/storage v1.28.1/go.mod h1:Qnisd4CqDdo6BGs2AD5LLnEsmSQ80wQ5ogcBBKhU86Y= cloud.google.com/go/storage v1.29.0/go.mod h1:4puEjyTKnku6gfKoTfNOU/W+a9JyuVNxjpS5GBrB8h4= cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7BiccwkR7+P7gN8E= +cloud.google.com/go/storage v1.35.1 h1:B59ahL//eDfx2IIKFBeT5Atm9wnNmj3+8xG/W4WB//w= +cloud.google.com/go/storage v1.35.1/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= +cloud.google.com/go/storage v1.39.1 h1:MvraqHKhogCOTXTlct/9C3K3+Uy2jBmFYb3/Sp6dVtY= +cloud.google.com/go/storage v1.39.1/go.mod h1:xK6xZmxZmo+fyP7+DEF6FhNc24/JAe95OLyOHCXFH1o= cloud.google.com/go/storagetransfer v1.5.0/go.mod h1:dxNzUopWy7RQevYFHewchb29POFv3/AaBgnhqzqiK0w= cloud.google.com/go/storagetransfer v1.6.0/go.mod h1:y77xm4CQV/ZhFZH75PLEXY0ROiS7Gh6pSKrM8dJyg6I= cloud.google.com/go/storagetransfer v1.7.0/go.mod h1:8Giuj1QNb1kfLAiWM1bN6dHzfdlDAVC9rv9abHot2W4= @@ -1374,6 +1386,7 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/s2a-go v0.1.0/go.mod h1:OJpEgntRZo8ugHpF9hkoLJbS5dSI20XZeXJ9JVywLlM= github.com/google/s2a-go v0.1.3/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= +github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -1388,6 +1401,7 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.1/go.mod h1:AwSRAtLfXpU5 github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/enterprise-certificate-proxy v0.2.5/go.mod h1:RxW0N9901Cko1VOCW3SXCpWP+mlIEkk2tP7jnHy9a3w= +github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -1403,6 +1417,7 @@ github.com/googleapis/gax-go/v2 v2.7.1/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38 github.com/googleapis/gax-go/v2 v2.8.0/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI= github.com/googleapis/gax-go/v2 v2.10.0/go.mod h1:4UOEnMCrxsSqQ940WnTiD6qJ63le2ev3xfyagutxiPw= github.com/googleapis/gax-go/v2 v2.11.0/go.mod h1:DxmR61SGKkGLa2xigwuZIQpkCI2S5iydzRfb3peWZJI= +github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/googleapis/gax-go/v2 v2.12.2 h1:mhN09QQW1jEWeMF74zGR81R30z4VJzjZsfkUhuHF+DA= github.com/googleapis/gax-go/v2 v2.12.2/go.mod h1:61M8vcyyXR2kqKFxKrfA22jaA8JGF7Dc8App1U3H6jc= @@ -2090,6 +2105,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= @@ -2168,6 +2184,8 @@ google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvy google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= google.golang.org/api v0.139.0/go.mod h1:CVagp6Eekz9CjGZ718Z+sloknzkDJE7Vc1Ckj9+viBk= google.golang.org/api v0.149.0/go.mod h1:Mwn1B7JTXrzXtnvmzQE2BD6bYZQ8DShKZDZbeN9I7qI= +google.golang.org/api v0.150.0 h1:Z9k22qD289SZ8gCJrk4DrWXkNjtfvKAUo/l1ma8eBYE= +google.golang.org/api v0.150.0/go.mod h1:ccy+MJ6nrYFgE3WgRx/AMXOxOmU8Q4hSa+jjibzhxcg= google.golang.org/api v0.169.0 h1:QwWPy71FgMWqJN/l6jVlFHUa29a7dcUy02I8o799nPY= google.golang.org/api v0.169.0/go.mod h1:gpNOiMA2tZ4mf5R9Iwf4rK/Dcz0fbdIgWYWVoxmsyLg= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -2326,9 +2344,12 @@ google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb/go.mod h1:yZTlhN0t google.golang.org/genproto v0.0.0-20230920204549-e6e6cdab5c13/go.mod h1:CCviP9RmpZ1mxVr8MUjCnSiY09IbAXZxhLE6EhHIdPU= google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97/go.mod h1:t1VqOqqvce95G3hIDCT5FeO3YUc6Q4Oe24L/+rNMxRk= google.golang.org/genproto v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:EMfReVxb80Dq1hhioy0sOsY9jCE46YDgHlJ7fWVUWRE= +google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA= google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI= google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405/go.mod h1:3WDQMjmJk36UQhjQ89emUzb1mdaHcPeeAh4SCBKznB4= google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= +google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7 h1:ImUcDPHjTrAqNhlOkSocDLfG9rrNHH7w7uoKWPaWZ8s= +google.golang.org/genproto v0.0.0-20240311173647-c811ad7063a7/go.mod h1:/3XmxOjePkvmKrHuBy4zNFw7IzxJXtAgdpXi8Ll990U= google.golang.org/genproto/googleapis/api v0.0.0-20230525234020-1aefcd67740a/go.mod h1:ts19tUU+Z0ZShN1y3aPyq2+O3d5FUNNgT6FtOzmrNn8= google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= diff --git a/core/pkg/sync/blob/blob_sync.go b/core/pkg/sync/blob/blob_sync.go index ed53e8be7..20e9ecdc7 100644 --- a/core/pkg/sync/blob/blob_sync.go +++ b/core/pkg/sync/blob/blob_sync.go @@ -11,6 +11,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/sync" "gocloud.dev/blob" + _ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver //nolint:gosec ) diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go index 525ad7147..7d98a6015 100644 --- a/core/pkg/sync/builder/syncbuilder.go +++ b/core/pkg/sync/builder/syncbuilder.go @@ -10,6 +10,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/sync" + blobSync "github.com/open-feature/flagd/core/pkg/sync/blob" "github.com/open-feature/flagd/core/pkg/sync/file" "github.com/open-feature/flagd/core/pkg/sync/grpc" "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" @@ -17,6 +18,7 @@ import ( "github.com/open-feature/flagd/core/pkg/sync/kubernetes" "github.com/robfig/cron" "go.uber.org/zap" + "gocloud.dev/blob" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -27,6 +29,7 @@ const ( syncProviderGrpc = "grpc" syncProviderKubernetes = "kubernetes" syncProviderHTTP = "http" + syncProviderGcs = "gcs" ) var ( @@ -35,6 +38,7 @@ var ( regGRPC *regexp.Regexp regGRPCSecure *regexp.Regexp regFile *regexp.Regexp + regGcs *regexp.Regexp ) func init() { @@ -43,6 +47,7 @@ func init() { regGRPC = regexp.MustCompile("^" + grpc.Prefix) regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure) regFile = regexp.MustCompile("^file:") + regGcs = regexp.MustCompile("^gs://.+?/") } type ISyncBuilder interface { @@ -97,6 +102,9 @@ func (sb *SyncBuilder) syncFromConfig(sourceConfig sync.SourceConfig, logger *lo case syncProviderGrpc: logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", sourceConfig.URI)) return sb.newGRPC(sourceConfig, logger), nil + case syncProviderGcs: + logger.Debug(fmt.Sprintf("using blob sync-provider with gcs driver for: %s", sourceConfig.URI)) + return sb.newGcs(sourceConfig, logger), nil default: return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'", @@ -170,6 +178,34 @@ func (sb *SyncBuilder) newGRPC(config sync.SourceConfig, logger *logger.Logger) } } +func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) *blobSync.Sync { + // Extract bucket uri and object name from the full URI: + // gs://bucket/path/to/object results in gs://bucket/ as bucketUri and + // path/to/object as an object name. + bucketUri := regGcs.FindString(config.URI) + objectName := regGcs.ReplaceAllString(config.URI, "") + + // Defaults to 5 seconds if interval is not set. + var interval uint32 = 5 + if config.Interval != 0 { + interval = config.Interval + } + + return &blobSync.Sync{ + Bucket: bucketUri, + Object: objectName, + + BlobURLMux: blob.DefaultURLMux(), + + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "gcs"), + ), + Interval: interval, + Cron: cron.New(), + } +} + type IK8sClientBuilder interface { GetK8sClient() (dynamic.Interface, error) } diff --git a/core/pkg/sync/builder/syncbuilder_test.go b/core/pkg/sync/builder/syncbuilder_test.go index 1cdb3d404..9b88aa9d7 100644 --- a/core/pkg/sync/builder/syncbuilder_test.go +++ b/core/pkg/sync/builder/syncbuilder_test.go @@ -6,6 +6,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" "github.com/open-feature/flagd/core/pkg/sync" + "github.com/open-feature/flagd/core/pkg/sync/blob" buildermock "github.com/open-feature/flagd/core/pkg/sync/builder/mock" "github.com/open-feature/flagd/core/pkg/sync/file" "github.com/open-feature/flagd/core/pkg/sync/grpc" @@ -231,6 +232,10 @@ func Test_SyncsFromFromConfig(t *testing.T) { URI: "my-namespace/my-flags", Provider: syncProviderKubernetes, }, + { + URI: "gs://bucket/path/to/file", + Provider: syncProviderGcs, + }, }, }, wantSyncs: []sync.ISync{ @@ -239,6 +244,7 @@ func Test_SyncsFromFromConfig(t *testing.T) { &http.Sync{}, &file.Sync{}, &kubernetes.Sync{}, + &blob.Sync{}, }, wantErr: false, }, @@ -264,3 +270,57 @@ func Test_SyncsFromFromConfig(t *testing.T) { }) } } + +func Test_GcsConfig(t *testing.T) { + lg := logger.NewLogger(nil, false) + defaultInterval := uint32(5) + tests := []struct { + name string + uri string + interval uint32 + expectedBucket string + expectedObject string + expectedInterval uint32 + }{ + { + name: "simple path", + uri: "gs://bucket/path/to/object", + interval: 10, + expectedBucket: "gs://bucket/", + expectedObject: "path/to/object", + expectedInterval: 10, + }, + { + name: "default interval", + uri: "gs://bucket/path/to/object", + expectedBucket: "gs://bucket/", + expectedObject: "path/to/object", + expectedInterval: defaultInterval, + }, + { + name: "no object set", // Blob syncer will return error when fetching + uri: "gs://bucket/", + expectedBucket: "gs://bucket/", + expectedObject: "", + expectedInterval: defaultInterval, + }, + { + name: "malformed uri", // Blob syncer will return error when opening bucket + uri: "malformed", + expectedBucket: "", + expectedObject: "malformed", + expectedInterval: defaultInterval, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gcsSync := NewSyncBuilder().newGcs(sync.SourceConfig{ + URI: tt.uri, + Interval: tt.interval, + }, lg) + require.Equal(t, tt.expectedBucket, gcsSync.Bucket) + require.Equal(t, tt.expectedObject, gcsSync.Object) + require.Equal(t, int(tt.expectedInterval), int(gcsSync.Interval)) + }) + } +} diff --git a/core/pkg/sync/builder/utils.go b/core/pkg/sync/builder/utils.go index f831a92fb..9e8740686 100644 --- a/core/pkg/sync/builder/utils.go +++ b/core/pkg/sync/builder/utils.go @@ -64,9 +64,14 @@ func ParseSyncProviderURIs(uris []string) ([]sync.SourceConfig, error) { Provider: syncProviderGrpc, TLS: true, }) + case regGcs.Match(uriB): + syncProvidersParsed = append(syncProvidersParsed, sync.SourceConfig{ + URI: uri, + Provider: syncProviderGcs, + }) default: return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+ - "'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri) + "'http(s)://', 'grpc(s)://', 'gs://' or 'core.openfeature.dev'", uri) } } return syncProvidersParsed, nil diff --git a/core/pkg/sync/builder/utils_test.go b/core/pkg/sync/builder/utils_test.go index 9459fa4a8..417e494cd 100644 --- a/core/pkg/sync/builder/utils_test.go +++ b/core/pkg/sync/builder/utils_test.go @@ -28,7 +28,8 @@ func TestParseSource(t *testing.T) { {"uri":"config/samples/example_flags.json","provider":"file"}, {"uri":"http://test.com","provider":"http","bearerToken":":)"}, {"uri":"host:port","provider":"grpc"}, - {"uri":"default/my-crd","provider":"kubernetes"} + {"uri":"default/my-crd","provider":"kubernetes"}, + {"uri":"gs://bucket-name/path/to/file","provider":"gcs"} ]`, expectErr: false, out: []sync.SourceConfig{ @@ -49,6 +50,10 @@ func TestParseSource(t *testing.T) { URI: "default/my-crd", Provider: syncProviderKubernetes, }, + { + URI: "gs://bucket-name/path/to/file", + Provider: syncProviderGcs, + }, }, }, "multiple-syncs-with-options": { @@ -182,6 +187,7 @@ func TestParseSyncProviderURIs(t *testing.T) { "grpc://host:port", "grpcs://secure-grpc", "core.openfeature.dev/default/my-crd", + "gs://bucket-name/path/to/file", }, expectErr: false, out: []sync.SourceConfig{ @@ -207,6 +213,10 @@ func TestParseSyncProviderURIs(t *testing.T) { URI: "default/my-crd", Provider: "kubernetes", }, + { + URI: "gs://bucket-name/path/to/file", + Provider: syncProviderGcs, + }, }, }, "empty": { From 15d40702be7de56f2168e0484fa7077d092ffa03 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 22 Jul 2024 19:24:07 +0000 Subject: [PATCH 3/7] fix(deps): update module connectrpc.com/otelconnect to v0.7.1 (#1367) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [![Mend Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [connectrpc.com/otelconnect](https://togithub.com/connectrpc/otelconnect-go) | `v0.7.0` -> `v0.7.1` | [![age](https://developer.mend.io/api/mc/badges/age/go/connectrpc.com%2fotelconnect/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/go/connectrpc.com%2fotelconnect/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/go/connectrpc.com%2fotelconnect/v0.7.0/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/go/connectrpc.com%2fotelconnect/v0.7.0/v0.7.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | --- ### Release Notes
connectrpc/otelconnect-go (connectrpc.com/otelconnect) ### [`v0.7.1`](https://togithub.com/connectrpc/otelconnect-go/releases/tag/v0.7.1) [Compare Source](https://togithub.com/connectrpc/otelconnect-go/compare/v0.7.0...v0.7.1) This is a bug-fix release that addresses a race condition when closing a stream. #### What's Changed ##### Bugfixes - Fix data race in streaming client close by [@​emcfarlane](https://togithub.com/emcfarlane) in [#​173](https://togithub.com/connectrpc/otelconnect-go/issues/173) #### New Contributors - [@​gvacaliuc](https://togithub.com/gvacaliuc) made their first contribution in [#​163](https://togithub.com/connectrpc/otelconnect-go/issues/163) - [@​ytnsym](https://togithub.com/ytnsym) made their first contribution in [#​176](https://togithub.com/connectrpc/otelconnect-go/issues/176) - [@​drice-buf](https://togithub.com/drice-buf) made their first contribution in [#​178](https://togithub.com/connectrpc/otelconnect-go/issues/178) **Full Changelog**: https://github.com/connectrpc/otelconnect-go/compare/v0.7.0...v0.7.1
--- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Enabled. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://www.mend.io/free-developer-tools/renovate/). View the [repository job log](https://developer.mend.io/github/open-feature/flagd). Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- core/go.mod | 2 +- core/go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/go.mod b/core/go.mod index 561d06f81..a5e9f48da 100644 --- a/core/go.mod +++ b/core/go.mod @@ -8,7 +8,7 @@ require ( buf.build/gen/go/open-feature/flagd/grpc/go v1.4.0-20240215170432-1e611e2999cc.2 buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.34.2-20240215170432-1e611e2999cc.2 connectrpc.com/connect v1.16.2 - connectrpc.com/otelconnect v0.7.0 + connectrpc.com/otelconnect v0.7.1 github.com/diegoholiveira/jsonlogic/v3 v3.5.3 github.com/fsnotify/fsnotify v1.7.0 github.com/open-feature/flagd-schemas v0.2.9-0.20240527214546-61523e5efe3e diff --git a/core/go.sum b/core/go.sum index 52a82a72a..d809ccdc2 100644 --- a/core/go.sum +++ b/core/go.sum @@ -1165,6 +1165,8 @@ connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE= connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc= connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY= connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc= +connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY= +connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= From 224a656525ce3e1e7c49517febf4c536b7a687ce Mon Sep 17 00:00:00 2001 From: Alan Kutniewski Date: Tue, 23 Jul 2024 09:54:38 +0200 Subject: [PATCH 4/7] Fix lint and data race Signed-off-by: Alan Kutniewski --- core/pkg/sync/blob/blob_sync.go | 77 +++++++++++++--------------- core/pkg/sync/blob/blob_sync_test.go | 2 +- core/pkg/sync/blob/mock_blob.go | 7 ++- core/pkg/sync/builder/syncbuilder.go | 4 +- 4 files changed, 41 insertions(+), 49 deletions(-) diff --git a/core/pkg/sync/blob/blob_sync.go b/core/pkg/sync/blob/blob_sync.go index 20e9ecdc7..52f7cb9ed 100644 --- a/core/pkg/sync/blob/blob_sync.go +++ b/core/pkg/sync/blob/blob_sync.go @@ -12,7 +12,6 @@ import ( "github.com/open-feature/flagd/core/pkg/sync" "gocloud.dev/blob" _ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver - //nolint:gosec ) type Sync struct { @@ -33,7 +32,7 @@ type Cron interface { Stop() } -func (hs *Sync) Init(ctx context.Context) error { +func (hs *Sync) Init(_ context.Context) error { return nil } @@ -43,44 +42,21 @@ func (hs *Sync) IsReady() bool { func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %d", hs.Bucket, hs.Object, hs.Interval)) + _ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() { + err := hs.sync(ctx, dataSync, false) + if err != nil { + hs.Logger.Warn(fmt.Sprintf("sync failed: %v", err)) + } + }) // Initial fetch hs.Logger.Debug(fmt.Sprintf("initial sync of the %s/%s", hs.Bucket, hs.Object)) - err := hs.ReSync(ctx, dataSync) + err := hs.sync(ctx, dataSync, false) if err != nil { return err } - hs.ready = true - - hs.Logger.Debug(fmt.Sprintf("polling %s/%s every %d seconds", hs.Bucket, hs.Object, hs.Interval)) - _ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() { - hs.Logger.Debug(fmt.Sprintf("fetching configuration from %s/%s", hs.Bucket, hs.Object)) - bucket, err := hs.getBucket(ctx) - if err != nil { - hs.Logger.Warn(fmt.Sprintf("couldn't get bucket: %v", err)) - return - } - defer bucket.Close() - updated, err := hs.fetchObjectModificationTime(ctx, bucket) - if err != nil { - hs.Logger.Warn(fmt.Sprintf("couldn't get object attributes: %v", err)) - return - } - if hs.lastUpdated == updated { - hs.Logger.Debug("configuration hasn't changed, skipping fetching full object") - return - } - msg, err := hs.fetchObject(ctx, bucket) - if err != nil { - hs.Logger.Warn(fmt.Sprintf("couldn't get object: %v", err)) - return - } - hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg)) - dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL} - hs.lastUpdated = updated - }) + hs.ready = true hs.Cron.Start() - <-ctx.Done() hs.Cron.Stop() @@ -88,22 +64,35 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { + return hs.sync(ctx, dataSync, true) +} + +func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipCheckingModTime bool) error { bucket, err := hs.getBucket(ctx) if err != nil { - return err + return fmt.Errorf("couldn't get bucket: %v", err) } defer bucket.Close() - updated, err := hs.fetchObjectModificationTime(ctx, bucket) - if err != nil { - return err + var updated time.Time + if !skipCheckingModTime { + updated, err = hs.fetchObjectModificationTime(ctx, bucket) + if err != nil { + return fmt.Errorf("couldn't get object attributes: %v", err) + } + if hs.lastUpdated == updated { + hs.Logger.Debug("configuration hasn't changed, skipping fetching full object") + return nil + } } msg, err := hs.fetchObject(ctx, bucket) if err != nil { - return err + return fmt.Errorf("couldn't get object: %v", err) + } + hs.Logger.Debug(fmt.Sprintf("configuration updated: %s", msg)) + if !skipCheckingModTime { + hs.lastUpdated = updated } - hs.Logger.Info(fmt.Sprintf("configuration updated: %s", msg)) dataSync <- sync.DataSync{FlagData: msg, Source: hs.Bucket + hs.Object, Type: sync.ALL} - hs.lastUpdated = updated return nil } @@ -111,7 +100,11 @@ func (hs *Sync) getBucket(ctx context.Context) (*blob.Bucket, error) { if hs.Bucket == "" { return nil, errors.New("no bucket string set") } - return hs.BlobURLMux.OpenBucket(ctx, hs.Bucket) + b, err := hs.BlobURLMux.OpenBucket(ctx, hs.Bucket) + if err != nil { + return nil, fmt.Errorf("error opening bucket %s: %v", hs.Bucket, err) + } + return b, nil } func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bucket) (time.Time, error) { @@ -141,5 +134,5 @@ func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, e return "", fmt.Errorf("error reading object %s/%s: %w", hs.Bucket, hs.Object, err) } - return string(buf.Bytes()), nil + return buf.String(), nil } diff --git a/core/pkg/sync/blob/blob_sync_test.go b/core/pkg/sync/blob/blob_sync_test.go index 8af9c18c3..63b0f66a3 100644 --- a/core/pkg/sync/blob/blob_sync_test.go +++ b/core/pkg/sync/blob/blob_sync_test.go @@ -63,7 +63,7 @@ func TestSync(t *testing.T) { } func tickWithConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync, blobMock *MockBlob, newConfig string) { - time.Sleep(time.Millisecond) // sleep so the new file has different modification date + time.Sleep(1 * time.Millisecond) // sleep so the new file has different modification date blobMock.AddObject(object, newConfig) mockCron.Tick() select { diff --git a/core/pkg/sync/blob/mock_blob.go b/core/pkg/sync/blob/mock_blob.go index c0b4ea0b7..59d003f8d 100644 --- a/core/pkg/sync/blob/mock_blob.go +++ b/core/pkg/sync/blob/mock_blob.go @@ -11,7 +11,6 @@ import ( type MockBlob struct { mux *blob.URLMux - bucket *blob.Bucket scheme string opener *fakeOpener } @@ -23,13 +22,13 @@ type fakeOpener struct { getSync func() *Sync } -func (f *fakeOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) { - bucketUrl, err := url.Parse("mem://") +func (f *fakeOpener) OpenBucketURL(ctx context.Context, _ *url.URL) (*blob.Bucket, error) { + bucketURL, err := url.Parse("mem://") if err != nil { log.Fatalf("couldn't parse url: %s: %v", "mem://", err) } opener := &memblob.URLOpener{} - bucket, err := opener.OpenBucketURL(context.Background(), bucketUrl) + bucket, err := opener.OpenBucketURL(ctx, bucketURL) if err != nil { log.Fatalf("couldn't open in memory bucket: %v", err) } diff --git a/core/pkg/sync/builder/syncbuilder.go b/core/pkg/sync/builder/syncbuilder.go index 7d98a6015..a923a331f 100644 --- a/core/pkg/sync/builder/syncbuilder.go +++ b/core/pkg/sync/builder/syncbuilder.go @@ -182,7 +182,7 @@ func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) * // Extract bucket uri and object name from the full URI: // gs://bucket/path/to/object results in gs://bucket/ as bucketUri and // path/to/object as an object name. - bucketUri := regGcs.FindString(config.URI) + bucketURI := regGcs.FindString(config.URI) objectName := regGcs.ReplaceAllString(config.URI, "") // Defaults to 5 seconds if interval is not set. @@ -192,7 +192,7 @@ func (sb *SyncBuilder) newGcs(config sync.SourceConfig, logger *logger.Logger) * } return &blobSync.Sync{ - Bucket: bucketUri, + Bucket: bucketURI, Object: objectName, BlobURLMux: blob.DefaultURLMux(), From 186822eb760ad70a15a20397c19422dd660f8c6f Mon Sep 17 00:00:00 2001 From: Alan Kutniewski Date: Wed, 24 Jul 2024 10:51:43 +0200 Subject: [PATCH 5/7] Update docs with the information about GCS sync source Signed-off-by: Alan Kutniewski --- docs/concepts/syncs.md | 17 +++++++++++++++++ docs/reference/sync-configuration.md | 11 ++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/docs/concepts/syncs.md b/docs/concepts/syncs.md index 612ff7441..e0dc4ed30 100644 --- a/docs/concepts/syncs.md +++ b/docs/concepts/syncs.md @@ -68,6 +68,23 @@ In this example, `default/my_example` expected to be a valid FeatureFlag resourc namespace and `my_example` being the resource name. See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details. +--- + +### GCS sync + +The GCS sync provider fetches flags from a GCS blob and periodically poll the GCS for the flag definition updates. +It uses [application default credentials](https://cloud.google.com/docs/authentication/application-default-credentials) if they +are [configured](https://cloud.google.com/docs/authentication/provide-credentials-adc) to authorize the calls to GCS. + +```shell +flagd start --uri gs://my-bucket/my-flags.json +``` + +In this example, `gs://my-bucket/my-flags.json` is expected to be a valid GCS URI accessible by the flagd +(either by being public or together with application default credentials). +The polling interval can be configured. +See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details. + ## Merging Flagd can be configured to read from multiple sources at once, when this is the case flagd will merge all flag definition into a single diff --git a/docs/reference/sync-configuration.md b/docs/reference/sync-configuration.md index 0c3433295..902463903 100644 --- a/docs/reference/sync-configuration.md +++ b/docs/reference/sync-configuration.md @@ -17,6 +17,7 @@ it is passed to the correct implementation: | `file` | `file:` | `file:etc/flagd/my-flags.json` | | `http` | `http(s)://` | `https://my-flags.com/flags` | | `grpc` | `grpc(s)://` | `grpc://my-flags-server` | +| `gcs` | `gs://` | `gs://my-bucket/my-flags.json` | ## Source Configuration @@ -30,10 +31,10 @@ Alternatively, these configurations can be passed to flagd via config file, spec | Field | Type | Note | | ----------- | ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | uri | required `string` | Flag configuration source of the sync | -| provider | required `string` | Provider type - `file`, `kubernetes`, `http`, or `grpc` | +| provider | required `string` | Provider type - `file`, `kubernetes`, `http`, `grpc` or `gcs` | | authHeader | optional `string` | Used for http sync; set this to include the complete `Authorization` header value for any authentication scheme (e.g., "Bearer token_here", "Basic base64_credentials", etc.). Cannot be used with `bearerToken` | | bearerToken | optional `string` | (Deprecated) Used for http sync; token gets appended to `Authorization` header with [bearer schema](https://www.rfc-editor.org/rfc/rfc6750#section-2.1). Cannot be used with `authHeader` | -| interval | optional `uint32` | Used for http sync; requests will be made at this interval. Defaults to 5 seconds. | +| interval | optional `uint32` | Used for http and gcs syncs; requests will be made at this interval. Defaults to 5 seconds. | | tls | optional `boolean` | Enable/Disable secure TLS connectivity. Currently used only by gRPC sync. Default (ex: if unset) is false, which will use an insecure connection | | providerID | optional `string` | Value binds to grpc connection's providerID field. gRPC server implementations may use this to identify connecting flagd instance | | selector | optional `string` | Value binds to grpc connection's selector field. gRPC server implementations may use this to filter flag configurations | @@ -54,6 +55,7 @@ Sync providers: - `kubernetes` - default/my-flag-config - `grpc`(insecure) - grpc-source:8080 - `grpcs`(secure) - my-flag-source:8080 +- `gcs` - gs://my-bucket/my-flags.json Startup command: @@ -66,7 +68,8 @@ Startup command: {"uri":"default/my-flag-config","provider":"kubernetes"}, {"uri":"grpc-source:8080","provider":"grpc"}, {"uri":"my-flag-source:8080","provider":"grpc", "maxMsgSize": 5242880}, - {"uri":"my-flag-source:8080","provider":"grpc", "certPath": "/certs/ca.cert", "tls": true, "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}]' + {"uri":"my-flag-source:8080","provider":"grpc", "certPath": "/certs/ca.cert", "tls": true, "providerID": "flagd-weatherapp-sidecar", "selector": "source=database,app=weatherapp"}, + {"uri":"gs://my-bucket/my-flag.json","provider":"gcs"}]' ``` Configuration file, @@ -91,4 +94,6 @@ sources: tls: true providerID: flagd-weatherapp-sidecar selector: "source=database,app=weatherapp" + - uri: gs://my-bucket/my-flag.json + provider: gcs ``` From 18952d88d59b8e13f0c32c584626166d02a44b51 Mon Sep 17 00:00:00 2001 From: Alan Kutniewski Date: Wed, 24 Jul 2024 16:47:43 +0200 Subject: [PATCH 6/7] Simplify the blob sync by using bucket.Download and validate bucket and boject in the init. Signed-off-by: Alan Kutniewski --- core/pkg/sync/blob/blob_sync.go | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/core/pkg/sync/blob/blob_sync.go b/core/pkg/sync/blob/blob_sync.go index 52f7cb9ed..228e6633a 100644 --- a/core/pkg/sync/blob/blob_sync.go +++ b/core/pkg/sync/blob/blob_sync.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "io" "time" "github.com/open-feature/flagd/core/pkg/logger" @@ -33,6 +32,12 @@ type Cron interface { } func (hs *Sync) Init(_ context.Context) error { + if hs.Bucket == "" { + return errors.New("no bucket string set") + } + if hs.Object == "" { + return errors.New("no object string set") + } return nil } @@ -97,9 +102,6 @@ func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipChe } func (hs *Sync) getBucket(ctx context.Context) (*blob.Bucket, error) { - if hs.Bucket == "" { - return nil, errors.New("no bucket string set") - } b, err := hs.BlobURLMux.OpenBucket(ctx, hs.Bucket) if err != nil { return nil, fmt.Errorf("error opening bucket %s: %v", hs.Bucket, err) @@ -119,19 +121,10 @@ func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bu } func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, error) { - if hs.Object == "" { - return "", errors.New("no object string set") - } - r, err := bucket.NewReader(ctx, hs.Object, nil) - if err != nil { - return "", fmt.Errorf("error creating reader for object %s/%s: %w", hs.Bucket, hs.Object, err) - } - defer r.Close() - buf := bytes.NewBuffer(nil) - _, err = io.Copy(buf, r) + err := bucket.Download(ctx, hs.Object, buf, nil) if err != nil { - return "", fmt.Errorf("error reading object %s/%s: %w", hs.Bucket, hs.Object, err) + return "", fmt.Errorf("error downloading object %s/%s: %w", hs.Bucket, hs.Object, err) } return buf.String(), nil From 75c2ce96dc394e4c81edac80c1b95bd129550869 Mon Sep 17 00:00:00 2001 From: Alan Kutniewski Date: Thu, 25 Jul 2024 11:21:51 +0200 Subject: [PATCH 7/7] Fix log message Signed-off-by: Alan Kutniewski --- core/pkg/sync/blob/blob_sync.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/pkg/sync/blob/blob_sync.go b/core/pkg/sync/blob/blob_sync.go index 228e6633a..c3fc21d18 100644 --- a/core/pkg/sync/blob/blob_sync.go +++ b/core/pkg/sync/blob/blob_sync.go @@ -46,7 +46,7 @@ func (hs *Sync) IsReady() bool { } func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %d", hs.Bucket, hs.Object, hs.Interval)) + hs.Logger.Info(fmt.Sprintf("starting sync from %s/%s with interval %ds", hs.Bucket, hs.Object, hs.Interval)) _ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() { err := hs.sync(ctx, dataSync, false) if err != nil { @@ -88,6 +88,9 @@ func (hs *Sync) sync(ctx context.Context, dataSync chan<- sync.DataSync, skipChe hs.Logger.Debug("configuration hasn't changed, skipping fetching full object") return nil } + if hs.lastUpdated.After(updated) { + hs.Logger.Warn("configuration changed but the modification time decreased instead of increasing") + } } msg, err := hs.fetchObject(ctx, bucket) if err != nil {