Skip to content

Commit

Permalink
cache seen connections
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Jul 24, 2024
1 parent 986d9fa commit 9e2b4e1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 7 deletions.
41 changes: 40 additions & 1 deletion x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"net/url"
"sync"
"time"

smithyhttp "github.com/aws/smithy-go/transport/http"
Expand Down Expand Up @@ -229,6 +230,19 @@ func (a *awsSQSAPI) GetQueueAttributes(ctx context.Context, attr []types.QueueAt

type awsS3API struct {
client *s3.Client

// others is the set of other clients referred
// to by notifications seen by API the connection.
// The number of cached elements is limited to
// awsS3APIcacheMax.
mu sync.RWMutex
others map[string]*s3.Client
}

const awsS3APIcacheMax = 100

func newAWSs3API(cli *s3.Client) *awsS3API {
return &awsS3API{client: cli, others: make(map[string]*s3.Client)}
}

func (a *awsS3API) GetObject(ctx context.Context, region, bucket, key string) (*s3.GetObjectOutput, error) {
Expand Down Expand Up @@ -294,8 +308,33 @@ func (a *awsS3API) clientFor(region string) *s3.Client {
if opts.Region == region {
return a.client
}
// Use a cached client if we have already seen this region.
a.mu.RLock()
cli, ok := a.others[region]
a.mu.RUnlock()
if ok {
return cli
}
// Otherwise create a new client and cache it.
opts.Region = region
return s3.New(opts)
cli = s3.New(opts)
// We may do this work more than once, but this will
// not cause any problems beyond minor fruitless work
// during warm-up.
a.mu.Lock()
// We should never be in the situation that the cache
// grows unbounded, but ensure this is the case.
if len(a.others) >= awsS3APIcacheMax {
// Do a single iteration delete to perform a
// random cache eviction.
for r := range a.others {
delete(a.others, r)
break
}
}
a.others[region] = cli
a.mu.Unlock()
return cli
}

func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func createS3API(ctx context.Context, config config, awsConfig awssdk.Config) (*
s3Client = s3.NewFromConfig(awsConfig, config.s3ConfigModifier)
}

return &awsS3API{
client: s3Client,
}, nil
return newAWSs3API(s3Client), nil
}

func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) {
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/awss3/sqs_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ func (in *sqsReaderInput) setup(
longPollWaitTime: in.config.SQSWaitTime,
}

in.s3 = &awsS3API{
client: s3.NewFromConfig(in.awsConfig, in.config.s3ConfigModifier),
}
in.s3 = newAWSs3API(s3.NewFromConfig(in.awsConfig, in.config.s3ConfigModifier))

in.metrics = newInputMetrics(inputContext.ID, nil, in.config.MaxNumberOfMessages)

Expand Down

0 comments on commit 9e2b4e1

Please sign in to comment.