Skip to content

Commit

Permalink
feat: cache S3 client for each regions
Browse files Browse the repository at this point in the history
  • Loading branch information
handlename committed May 17, 2024
1 parent 1af7a29 commit 8d2ac37
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@ var (

// Router represents s3-object-router application
type Router struct {
s3 *s3.Client
awsConf aws.Config

// s3 clients for each region
s3 map[string]*s3.Client

// s3 bucket region cache
s3bucketRegion map[string]string

option *Option
sem *semaphore.Weighted

Expand All @@ -67,9 +74,13 @@ func New(opt *Option) (*Router, error) {
}

return &Router{
s3: s3.NewFromConfig(awsConf),
option: opt,
sem: semaphore.NewWeighted(int64(MaxConcurrency)),
awsConf: awsConf,
s3: map[string]*s3.Client{
awsConf.Region: s3.NewFromConfig(awsConf),
},
s3bucketRegion: map[string]string{},
option: opt,
sem: semaphore.NewWeighted(int64(MaxConcurrency)),
genKeyPrefix: func(r *record) (string, error) {
var b strings.Builder
if err := tmpl.Execute(&b, r.parsed); err != nil {
Expand Down Expand Up @@ -210,7 +221,7 @@ func (r *Router) getS3Object(ctx context.Context, s3url string) (io.ReadCloser,
return nil, errors.New("s3:// required")
}

s3c, err := r.getS3Client(ctx, u.Host)
s3c, err := r.s3Client(ctx, u.Host)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -252,7 +263,7 @@ func (r *Router) putToS3(ctx context.Context, dest destination, body io.ReadSeek
r.sem.Acquire(ctx, 1)
defer r.sem.Release(1)

s3c, err := r.getS3Client(ctx, dest.Bucket)
s3c, err := r.s3Client(ctx, dest.Bucket)
if err != nil {
return err
}
Expand All @@ -270,26 +281,45 @@ func (r *Router) putToS3(ctx context.Context, dest destination, body io.ReadSeek
return err
}

// getS3Client returns s3 client for same region as the bucket
func (r *Router) getS3Client(ctx context.Context, bucket string) (*s3.Client, error) {
bucketRegion, err := manager.GetBucketRegion(ctx, r.s3, bucket)
func (r *Router) defaultS3Client() *s3.Client {
return r.s3[r.awsConf.Region]
}

// s3Client returns s3 client for same region as the bucket
func (r *Router) s3Client(ctx context.Context, bucket string) (*s3.Client, error) {
bucketRegion, err := r.getS3BucketRegion(ctx, bucket)
if err != nil {
return nil, err
}

if r.s3.Options().Region == bucketRegion {
return r.s3, nil
if s3, ok := r.s3[bucketRegion]; ok {
return s3, nil
}

awsConfig, err := config.LoadDefaultConfig(
ctx,
config.WithRegion(bucketRegion),
)
awsConfig := r.awsConf.Copy()
awsConfig.Region = bucketRegion
s3 := s3.NewFromConfig(awsConfig)
r.s3[bucketRegion] = s3

return s3, nil
}

func (r *Router) getS3BucketRegion(ctx context.Context, bucket string) (string, error) {
log.Println("in getS3BucketRegion")
if region, ok := r.s3bucketRegion[bucket]; ok {
log.Printf("[debug] bucket region for %s is cached: %s\n", bucket, region)
return region, nil
}

region, err := manager.GetBucketRegion(ctx, r.defaultS3Client(), bucket)
if err != nil {
return nil, err
return "", err
}

return s3.NewFromConfig(awsConfig), nil
r.s3bucketRegion[bucket] = region
log.Printf("[debug] bucket region for %s is %s, added to cache\n", bucket, region)

return region, nil
}

type record struct {
Expand Down

0 comments on commit 8d2ac37

Please sign in to comment.