From 986d9faa13cee2216cb5cc9a8e90c5afba425563 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 23 Jul 2024 14:29:34 +0930 Subject: [PATCH 1/3] x-pack/filebeat/input/awss3: allow cross-region bucket configuration --- CHANGELOG.next.asciidoc | 1 + .../input/awss3/input_benchmark_test.go | 6 +-- x-pack/filebeat/input/awss3/interfaces.go | 31 ++++++++---- .../input/awss3/mock_interfaces_test.go | 48 +++++++++---------- x-pack/filebeat/input/awss3/s3_objects.go | 6 +-- .../filebeat/input/awss3/s3_objects_test.go | 16 +++---- x-pack/filebeat/input/awss3/s3_test.go | 22 ++++----- 7 files changed, 72 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 07b5a1bd24d..e8edbaec8f0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -313,6 +313,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083] - Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072] - Update CEL mito extensions to v1.15.0. {pull}40294[40294] +- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309] *Auditbeat* diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 09b7c8bd9d2..12837c410d2 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -143,15 +143,15 @@ func newConstantS3(t testing.TB) *constantS3 { } } -func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { +func (c constantS3) GetObject(ctx context.Context, _, bucket, key string) (*s3.GetObjectOutput, error) { return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil } -func (c constantS3) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { +func (c constantS3) CopyObject(ctx context.Context, _, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { return nil, nil } -func (c constantS3) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { +func (c constantS3) DeleteObject(ctx context.Context, _, bucket, key string) (*s3.DeleteObjectOutput, error) { return nil, nil } diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 1f1390c4f2f..38132cd07d4 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -30,6 +30,8 @@ import ( //go:generate go install github.com/golang/mock/mockgen@v1.6.0 //go:generate mockgen -source=interfaces.go -destination=mock_interfaces_test.go -package awss3 -mock_names=sqsAPI=MockSQSAPI,sqsProcessor=MockSQSProcessor,s3API=MockS3API,s3Pager=MockS3Pager,s3ObjectHandlerFactory=MockS3ObjectHandlerFactory,s3ObjectHandler=MockS3ObjectHandler //go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient,Pipeline=MockBeatPipeline github.com/elastic/beats/v7/libbeat/beat Client,Pipeline +//go:generate go-licenser -license Elastic . +//go:generate goimports -w -local github.com/elastic . // ------ // SQS interfaces @@ -79,12 +81,12 @@ type s3API interface { } type s3Getter interface { - GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) + GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) } type s3Mover interface { - CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) - DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) + CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) + DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error) } type s3Lister interface { @@ -229,8 +231,8 @@ type awsS3API struct { client *s3.Client } -func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { - getObjectOutput, err := a.client.GetObject(ctx, &s3.GetObjectInput{ +func (a *awsS3API) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) { + getObjectOutput, err := a.clientFor(region).GetObject(ctx, &s3.GetObjectInput{ Bucket: awssdk.String(bucket), Key: awssdk.String(key), }, s3.WithAPIOptions( @@ -262,8 +264,8 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb return getObjectOutput, nil } -func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { - copyObjectOutput, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{ +func (a *awsS3API) CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { + copyObjectOutput, err := a.clientFor(region).CopyObject(ctx, &s3.CopyObjectInput{ Bucket: awssdk.String(to_bucket), CopySource: awssdk.String(fmt.Sprintf("%s/%s", from_bucket, from_key)), Key: awssdk.String(to_key), @@ -274,8 +276,8 @@ func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_ return copyObjectOutput, nil } -func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { - deleteObjectOutput, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{ +func (a *awsS3API) DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error) { + deleteObjectOutput, err := a.clientFor(region).DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: awssdk.String(bucket), Key: awssdk.String(key), }) @@ -285,6 +287,17 @@ func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.De return deleteObjectOutput, nil } +func (a *awsS3API) clientFor(region string) *s3.Client { + // Conditionally replace the client if the region of + // the request does not match the pre-prepared client. + opts := a.client.Options() + if opts.Region == region { + return a.client + } + opts.Region = region + return s3.New(opts) +} + func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { pager := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{ Bucket: awssdk.String(bucket), diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index b976cf00ebc..ccae48a59b2 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -314,48 +314,48 @@ func (m *MockS3API) EXPECT() *MockS3APIMockRecorder { } // CopyObject mocks base method. -func (m *MockS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { +func (m *MockS3API) CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CopyObject", ctx, from_bucket, to_bucket, from_key, to_key) + ret := m.ctrl.Call(m, "CopyObject", ctx, region, from_bucket, to_bucket, from_key, to_key) ret0, _ := ret[0].(*s3.CopyObjectOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // CopyObject indicates an expected call of CopyObject. -func (mr *MockS3APIMockRecorder) CopyObject(ctx, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { +func (mr *MockS3APIMockRecorder) CopyObject(ctx, region, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockS3API)(nil).CopyObject), ctx, from_bucket, to_bucket, from_key, to_key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockS3API)(nil).CopyObject), ctx, region, from_bucket, to_bucket, from_key, to_key) } // DeleteObject mocks base method. -func (m *MockS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { +func (m *MockS3API) DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteObject", ctx, bucket, key) + ret := m.ctrl.Call(m, "DeleteObject", ctx, region, bucket, key) ret0, _ := ret[0].(*s3.DeleteObjectOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // DeleteObject indicates an expected call of DeleteObject. -func (mr *MockS3APIMockRecorder) DeleteObject(ctx, bucket, key interface{}) *gomock.Call { +func (mr *MockS3APIMockRecorder) DeleteObject(ctx, region, bucket, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3API)(nil).DeleteObject), ctx, bucket, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3API)(nil).DeleteObject), ctx, region, bucket, key) } // GetObject mocks base method. -func (m *MockS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { +func (m *MockS3API) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetObject", ctx, bucket, key) + ret := m.ctrl.Call(m, "GetObject", ctx, region, bucket, key) ret0, _ := ret[0].(*s3.GetObjectOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // GetObject indicates an expected call of GetObject. -func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock.Call { +func (mr *MockS3APIMockRecorder) GetObject(ctx, region, bucket, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, bucket, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockS3API)(nil).GetObject), ctx, region, bucket, key) } // ListObjectsPaginator mocks base method. @@ -396,18 +396,18 @@ func (m *Mocks3Getter) EXPECT() *Mocks3GetterMockRecorder { } // GetObject mocks base method. -func (m *Mocks3Getter) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) { +func (m *Mocks3Getter) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetObject", ctx, bucket, key) + ret := m.ctrl.Call(m, "GetObject", ctx, region, bucket, key) ret0, _ := ret[0].(*s3.GetObjectOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // GetObject indicates an expected call of GetObject. -func (mr *Mocks3GetterMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock.Call { +func (mr *Mocks3GetterMockRecorder) GetObject(ctx, region, bucket, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*Mocks3Getter)(nil).GetObject), ctx, bucket, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*Mocks3Getter)(nil).GetObject), ctx, region, bucket, key) } // Mocks3Mover is a mock of s3Mover interface. @@ -434,33 +434,33 @@ func (m *Mocks3Mover) EXPECT() *Mocks3MoverMockRecorder { } // CopyObject mocks base method. -func (m *Mocks3Mover) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { +func (m *Mocks3Mover) CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CopyObject", ctx, from_bucket, to_bucket, from_key, to_key) + ret := m.ctrl.Call(m, "CopyObject", ctx, region, from_bucket, to_bucket, from_key, to_key) ret0, _ := ret[0].(*s3.CopyObjectOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // CopyObject indicates an expected call of CopyObject. -func (mr *Mocks3MoverMockRecorder) CopyObject(ctx, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { +func (mr *Mocks3MoverMockRecorder) CopyObject(ctx, region, from_bucket, to_bucket, from_key, to_key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*Mocks3Mover)(nil).CopyObject), ctx, from_bucket, to_bucket, from_key, to_key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*Mocks3Mover)(nil).CopyObject), ctx, region, from_bucket, to_bucket, from_key, to_key) } // DeleteObject mocks base method. -func (m *Mocks3Mover) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) { +func (m *Mocks3Mover) DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteObject", ctx, bucket, key) + ret := m.ctrl.Call(m, "DeleteObject", ctx, region, bucket, key) ret0, _ := ret[0].(*s3.DeleteObjectOutput) ret1, _ := ret[1].(error) return ret0, ret1 } // DeleteObject indicates an expected call of DeleteObject. -func (mr *Mocks3MoverMockRecorder) DeleteObject(ctx, bucket, key interface{}) *gomock.Call { +func (mr *Mocks3MoverMockRecorder) DeleteObject(ctx, region, bucket, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*Mocks3Mover)(nil).DeleteObject), ctx, bucket, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*Mocks3Mover)(nil).DeleteObject), ctx, region, bucket, key) } // Mocks3Lister is a mock of s3Lister interface. diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 05ee572343f..59e73758fc3 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -201,7 +201,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error { // Content-Type and reader to get the object's contents. The caller must // close the returned reader. func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) { - getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) + getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) if err != nil { return "", nil, nil, err } @@ -441,14 +441,14 @@ func (p *s3ObjectProcessor) FinalizeS3Object() error { return nil } backupKey := p.backupConfig.BackupToBucketPrefix + p.s3Obj.S3.Object.Key - _, err := p.s3.CopyObject(p.ctx, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey) + _, err := p.s3.CopyObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey) if err != nil { return fmt.Errorf("failed to copy object to backup bucket: %w", err) } if !p.backupConfig.Delete { return nil } - _, err = p.s3.DeleteObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) + _, err = p.s3.DeleteObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key) if err != nil { return fmt.Errorf("failed to delete object from bucket: %w", err) } diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index df50726823f..635955ed8c4 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -153,7 +153,7 @@ func TestS3ObjectProcessor(t *testing.T) { s3Event := newS3Event("log.txt") mockS3API.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) @@ -175,7 +175,7 @@ func TestS3ObjectProcessor(t *testing.T) { s3Event := newS3Event("log.txt") mockS3API.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{}) @@ -197,7 +197,7 @@ func TestS3ObjectProcessor(t *testing.T) { var events []beat.Event gomock.InOrder( mockS3API.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(s3Resp, nil), mockPublisher.EXPECT(). Publish(gomock.Any()). @@ -227,7 +227,7 @@ func TestS3ObjectProcessor(t *testing.T) { gomock.InOrder( mockS3API.EXPECT(). - CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). + CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil), ) @@ -254,10 +254,10 @@ func TestS3ObjectProcessor(t *testing.T) { gomock.InOrder( mockS3API.EXPECT(). - CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). + CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil), mockS3API.EXPECT(). - DeleteObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + DeleteObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(nil, nil), ) @@ -284,7 +284,7 @@ func TestS3ObjectProcessor(t *testing.T) { gomock.InOrder( mockS3API.EXPECT(). - CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")). + CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")). Return(nil, nil), ) @@ -326,7 +326,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int, var events []beat.Event gomock.InOrder( mockS3API.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). + GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)). Return(s3Resp, nil), mockPublisher.EXPECT(). Publish(gomock.Any()). diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 216d9866e73..9c6099e775a 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -103,27 +103,27 @@ func TestS3Poller(t *testing.T) { }) mockAPI.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key1")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key1")). Return(nil, errFakeConnectivityFailure) mockAPI.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key2")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key2")). Return(nil, errFakeConnectivityFailure) mockAPI.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key3")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key3")). Return(nil, errFakeConnectivityFailure) mockAPI.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key4")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key4")). Return(nil, errFakeConnectivityFailure) mockAPI.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) mockAPI.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("2024-02-08T08:35:00+00:02.json.gz")). Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, nil, backupConfig{}) @@ -245,23 +245,23 @@ func TestS3Poller(t *testing.T) { }) mockS3.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key1")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key1")). Return(nil, errFakeConnectivityFailure) mockS3.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key2")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key2")). Return(nil, errFakeConnectivityFailure) mockS3.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key3")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key3")). Return(nil, errFakeConnectivityFailure) mockS3.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key4")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key4")). Return(nil, errFakeConnectivityFailure) mockS3.EXPECT(). - GetObject(gomock.Any(), gomock.Eq(bucket), gomock.Eq("key5")). + GetObject(gomock.Any(), gomock.Eq(""), gomock.Eq(bucket), gomock.Eq("key5")). Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3, nil, backupConfig{}) From d5a82982040451aeeb05f92778b7f77480fc8758 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 25 Jul 2024 07:09:21 +0930 Subject: [PATCH 2/3] cache seen connections --- x-pack/filebeat/input/awss3/interfaces.go | 48 ++++++++++++++++++++++- x-pack/filebeat/input/awss3/s3.go | 4 +- x-pack/filebeat/input/awss3/sqs_input.go | 4 +- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 38132cd07d4..38ad0106ef1 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "net/url" + "sync" "time" smithyhttp "github.com/aws/smithy-go/transport/http" @@ -229,6 +230,19 @@ func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAt type awsS3API struct { client *s3.Client + + // others is the set of other clients referred + // to by notifications seen by API the connection. + // The number of cached elements is limited to + // awsS3APIcacheMax. + mu sync.RWMutex + others map[string]*s3.Client +} + +const awsS3APIcacheMax = 100 + +func newAWSs3API(cli *s3.Client) *awsS3API { + return &awsS3API{client: cli, others: make(map[string]*s3.Client)} } func (a *awsS3API) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) { @@ -294,8 +308,40 @@ func (a *awsS3API) clientFor(region string) *s3.Client { if opts.Region == region { return a.client } + // Use a cached client if we have already seen this region. + a.mu.RLock() + cli, ok := a.others[region] + a.mu.RUnlock() + if ok { + return cli + } + + a.mu.Lock() + defer a.mu.Unlock() + + // Check that another writer did not beat us here. + cli, ok = a.others[region] + if ok { + // ... they did. + return cli + } + + // Otherwise create a new client and cache it. opts.Region = region - return s3.New(opts) + cli = s3.New(opts) + // We should never be in the situation that the cache + // grows unbounded, but ensure this is the case. + if len(a.others) >= awsS3APIcacheMax { + // Do a single iteration delete to perform a + // random cache eviction. + for r := range a.others { + delete(a.others, r) + break + } + } + a.others[region] = cli + + return cli } func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index eb8e19c2cf9..d611470ec80 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -29,9 +29,7 @@ func createS3API(ctx context.Context, config config, awsConfig awssdk.Config) (* s3Client = s3.NewFromConfig(awsConfig, config.s3ConfigModifier) } - return &awsS3API{ - client: s3Client, - }, nil + return newAWSs3API(s3Client), nil } func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) { diff --git a/x-pack/filebeat/input/awss3/sqs_input.go b/x-pack/filebeat/input/awss3/sqs_input.go index e524cf9fd1c..a92319cbe19 100644 --- a/x-pack/filebeat/input/awss3/sqs_input.go +++ b/x-pack/filebeat/input/awss3/sqs_input.go @@ -103,9 +103,7 @@ func (in *sqsReaderInput) setup( longPollWaitTime: in.config.SQSWaitTime, } - in.s3 = &awsS3API{ - client: s3.NewFromConfig(in.awsConfig, in.config.s3ConfigModifier), - } + in.s3 = newAWSs3API(s3.NewFromConfig(in.awsConfig, in.config.s3ConfigModifier)) in.metrics = newInputMetrics(inputContext.ID, nil, in.config.MaxNumberOfMessages) From f9fc55253ac5979a61eef08c2abcb73dc66a6081 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 26 Jul 2024 06:49:11 +0930 Subject: [PATCH 3/3] fix grammar --- x-pack/filebeat/input/awss3/interfaces.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 38ad0106ef1..b5a0c408ae2 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -232,7 +232,7 @@ type awsS3API struct { client *s3.Client // others is the set of other clients referred - // to by notifications seen by API the connection. + // to by notifications seen by the API connection. // The number of cached elements is limited to // awsS3APIcacheMax. mu sync.RWMutex