Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/awss3: allow cross-region bucket configuration #40309

Merged
merged 3 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Update CEL mito extensions to v1.15.0. {pull}40294[40294]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ func newConstantS3(t testing.TB) *constantS3 {
}
}

func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
func (c constantS3) GetObject(ctx context.Context, _, bucket, key string) (*s3.GetObjectOutput, error) {
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
}

func (c constantS3) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
func (c constantS3) CopyObject(ctx context.Context, _, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
return nil, nil
}

func (c constantS3) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) {
func (c constantS3) DeleteObject(ctx context.Context, _, bucket, key string) (*s3.DeleteObjectOutput, error) {
return nil, nil
}

Expand Down
77 changes: 68 additions & 9 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"net/url"
"sync"
"time"

smithyhttp "github.com/aws/smithy-go/transport/http"
Expand All @@ -30,6 +31,8 @@ import (
//go:generate go install github.com/golang/mock/mockgen@v1.6.0
//go:generate mockgen -source=interfaces.go -destination=mock_interfaces_test.go -package awss3 -mock_names=sqsAPI=MockSQSAPI,sqsProcessor=MockSQSProcessor,s3API=MockS3API,s3Pager=MockS3Pager,s3ObjectHandlerFactory=MockS3ObjectHandlerFactory,s3ObjectHandler=MockS3ObjectHandler
//go:generate mockgen -destination=mock_publisher_test.go -package=awss3 -mock_names=Client=MockBeatClient,Pipeline=MockBeatPipeline github.com/elastic/beats/v7/libbeat/beat Client,Pipeline
//go:generate go-licenser -license Elastic .
//go:generate goimports -w -local github.com/elastic .

// ------
// SQS interfaces
Expand Down Expand Up @@ -79,12 +82,12 @@ type s3API interface {
}

type s3Getter interface {
GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error)
GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error)
}

type s3Mover interface {
CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error)
CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error)
}

type s3Lister interface {
Expand Down Expand Up @@ -227,10 +230,23 @@ func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAt

type awsS3API struct {
client *s3.Client

// others is the set of other clients referred
// to by notifications seen by the API connection.
// The number of cached elements is limited to
// awsS3APIcacheMax.
mu sync.RWMutex
others map[string]*s3.Client
}

const awsS3APIcacheMax = 100

func newAWSs3API(cli *s3.Client) *awsS3API {
return &awsS3API{client: cli, others: make(map[string]*s3.Client)}
}

func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetObjectOutput, error) {
getObjectOutput, err := a.client.GetObject(ctx, &s3.GetObjectInput{
func (a *awsS3API) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) {
getObjectOutput, err := a.clientFor(region).GetObject(ctx, &s3.GetObjectInput{
Bucket: awssdk.String(bucket),
Key: awssdk.String(key),
}, s3.WithAPIOptions(
Expand Down Expand Up @@ -262,8 +278,8 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb
return getObjectOutput, nil
}

func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
copyObjectOutput, err := a.client.CopyObject(ctx, &s3.CopyObjectInput{
func (a *awsS3API) CopyObject(ctx context.Context, region, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) {
copyObjectOutput, err := a.clientFor(region).CopyObject(ctx, &s3.CopyObjectInput{
Bucket: awssdk.String(to_bucket),
CopySource: awssdk.String(fmt.Sprintf("%s/%s", from_bucket, from_key)),
Key: awssdk.String(to_key),
Expand All @@ -274,8 +290,8 @@ func (a *awsS3API) CopyObject(ctx context.Context, from_bucket, to_bucket, from_
return copyObjectOutput, nil
}

func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.DeleteObjectOutput, error) {
deleteObjectOutput, err := a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
func (a *awsS3API) DeleteObject(ctx context.Context, region, bucket, key string) (*s3.DeleteObjectOutput, error) {
deleteObjectOutput, err := a.clientFor(region).DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: awssdk.String(bucket),
Key: awssdk.String(key),
})
Expand All @@ -285,6 +301,49 @@ func (a *awsS3API) DeleteObject(ctx context.Context, bucket, key string) (*s3.De
return deleteObjectOutput, nil
}

func (a *awsS3API) clientFor(region string) *s3.Client {
// Conditionally replace the client if the region of
// the request does not match the pre-prepared client.
opts := a.client.Options()
if opts.Region == region {
return a.client
}
// Use a cached client if we have already seen this region.
a.mu.RLock()
cli, ok := a.others[region]
a.mu.RUnlock()
if ok {
return cli
}

a.mu.Lock()
defer a.mu.Unlock()

// Check that another writer did not beat us here.
cli, ok = a.others[region]
if ok {
// ... they did.
return cli
}

// Otherwise create a new client and cache it.
opts.Region = region
cli = s3.New(opts)
// We should never be in the situation that the cache
// grows unbounded, but ensure this is the case.
if len(a.others) >= awsS3APIcacheMax {
// Do a single iteration delete to perform a
// random cache eviction.
for r := range a.others {
delete(a.others, r)
break
}
}
a.others[region] = cli

return cli
}

func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
pager := s3.NewListObjectsV2Paginator(a.client, &s3.ListObjectsV2Input{
Bucket: awssdk.String(bucket),
Expand Down
48 changes: 24 additions & 24 deletions x-pack/filebeat/input/awss3/mock_interfaces_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func createS3API(ctx context.Context, config config, awsConfig awssdk.Config) (*
s3Client = s3.NewFromConfig(awsConfig, config.s3ConfigModifier)
}

return &awsS3API{
client: s3Client,
}, nil
return newAWSs3API(s3Client), nil
}

func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *s3ObjectProcessor) ProcessS3Object() error {
// Content-Type and reader to get the object's contents. The caller must
// close the returned reader.
func (p *s3ObjectProcessor) download() (contentType string, metadata map[string]interface{}, body io.ReadCloser, err error) {
getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
getObjectOutput, err := p.s3.GetObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
if err != nil {
return "", nil, nil, err
}
Expand Down Expand Up @@ -441,14 +441,14 @@ func (p *s3ObjectProcessor) FinalizeS3Object() error {
return nil
}
backupKey := p.backupConfig.BackupToBucketPrefix + p.s3Obj.S3.Object.Key
_, err := p.s3.CopyObject(p.ctx, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey)
_, err := p.s3.CopyObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, bucketName, p.s3Obj.S3.Object.Key, backupKey)
if err != nil {
return fmt.Errorf("failed to copy object to backup bucket: %w", err)
}
if !p.backupConfig.Delete {
return nil
}
_, err = p.s3.DeleteObject(p.ctx, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
_, err = p.s3.DeleteObject(p.ctx, p.s3Obj.AWSRegion, p.s3Obj.S3.Bucket.Name, p.s3Obj.S3.Object.Key)
if err != nil {
return fmt.Errorf("failed to delete object from bucket: %w", err)
}
Expand Down
16 changes: 8 additions & 8 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestS3ObjectProcessor(t *testing.T) {
s3Event := newS3Event("log.txt")

mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, errFakeConnectivityFailure)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
Expand All @@ -175,7 +175,7 @@ func TestS3ObjectProcessor(t *testing.T) {
s3Event := newS3Event("log.txt")

mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil)

s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockS3API, nil, backupConfig{})
Expand All @@ -197,7 +197,7 @@ func TestS3ObjectProcessor(t *testing.T) {
var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
)

Expand All @@ -254,10 +254,10 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq("backup"), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
mockS3API.EXPECT().
DeleteObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
DeleteObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(nil, nil),
)

Expand All @@ -284,7 +284,7 @@ func TestS3ObjectProcessor(t *testing.T) {

gomock.InOrder(
mockS3API.EXPECT().
CopyObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")).
CopyObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key), gomock.Eq("backup/testdata/log.txt")).
Return(nil, nil),
)

Expand Down Expand Up @@ -326,7 +326,7 @@ func _testProcessS3Object(t testing.TB, file, contentType string, numEvents int,
var events []beat.Event
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
mockPublisher.EXPECT().
Publish(gomock.Any()).
Expand Down
Loading
Loading