Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/github.com/grpc-ecosys…
Browse files Browse the repository at this point in the history
…tem/grpc-gateway/v2-2.16.2
  • Loading branch information
markphelps authored Aug 8, 2023
2 parents fa8179a + ed6b7f0 commit 7c71272
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 55 deletions.
4 changes: 2 additions & 2 deletions build/internal/cmd/minio/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ func main() {

fmt.Fprintln(os.Stderr, "Copying", path)

name := d.Name()
f, err := dir.Open(path)
if err != nil {
return err
}
defer f.Close()
// use path to get full path to file
_, err = s3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucketName,
Key: &name,
Key: &path,
Body: f,
})

Expand Down
21 changes: 21 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,27 @@ func TestLoad(t *testing.T) {
return cfg
},
},
{
name: "s3 full config provided",
path: "./testdata/storage/s3_full.yml",
expected: func() *Config {
cfg := DefaultConfig()
cfg.Experimental.FilesystemStorage.Enabled = true
cfg.Storage = StorageConfig{
Type: ObjectStorageType,
Object: &Object{
Type: S3ObjectSubStorageType,
S3: &S3{
Bucket: "testbucket",
Prefix: "prefix",
Region: "region",
PollInterval: 5 * time.Minute,
},
},
}
return cfg
},
},
{
name: "s3 config invalid",
path: "./testdata/storage/s3_bucket_missing.yml",
Expand Down
1 change: 1 addition & 0 deletions internal/config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (o *Object) validate() error {
type S3 struct {
Endpoint string `json:"endpoint,omitempty" mapstructure:"endpoint"`
Bucket string `json:"bucket,omitempty" mapstructure:"bucket"`
Prefix string `json:"prefix,omitempty" mapstructure:"prefix"`
Region string `json:"region,omitempty" mapstructure:"region"`
PollInterval time.Duration `json:"pollInterval,omitempty" mapstructure:"poll_interval"`
}
Expand Down
12 changes: 12 additions & 0 deletions internal/config/testdata/storage/s3_full.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
experimental:
filesystem_storage:
enabled: true
storage:
type: object
object:
type: s3
s3:
bucket: "testbucket"
prefix: "prefix"
region: "region"
poll_interval: "5m"
108 changes: 76 additions & 32 deletions internal/s3fs/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"io/fs"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3"
Expand All @@ -24,9 +25,12 @@ type FS struct {
logger *zap.Logger
s3Client S3ClientAPI

bucket string
// configuration
bucket string
prefix string

// cached entries
dirEntry *Dir
entries []fs.DirEntry
}

// ensure FS implements fs.FS aka Open
Expand All @@ -39,11 +43,12 @@ var _ fs.StatFS = &FS{}
var _ fs.ReadDirFS = &FS{}

// New creates a FS for the single bucket
func New(logger *zap.Logger, s3Client S3ClientAPI, bucket string) (*FS, error) {
func New(logger *zap.Logger, s3Client S3ClientAPI, bucket string, prefix string) (*FS, error) {
return &FS{
logger: logger,
s3Client: s3Client,
bucket: bucket,
prefix: prefix,
}, nil
}

Expand All @@ -63,6 +68,12 @@ func (f *FS) Open(name string) (fs.File, error) {
return nil, pathError
}

// If a prefix is not provided, prepend the prefix. This
// allows s3fs to support `.flipt.yml` under the prefix
if f.prefix != "" && !strings.HasPrefix(name, f.prefix) {
name = f.prefix + name
}

output, err := f.s3Client.GetObject(context.Background(),
&s3.GetObjectInput{
Bucket: &f.bucket,
Expand Down Expand Up @@ -109,48 +120,81 @@ func (f *FS) Stat(name string) (fs.FileInfo, error) {
f.dirEntry = &Dir{
FileInfo: dirInfo,
}
output, err := f.s3Client.ListObjectsV2(context.Background(),
&s3.ListObjectsV2Input{
Bucket: &f.bucket,
})
if err != nil {
return nil, err
}

f.entries = make([]fs.DirEntry, len(output.Contents))
for i := range output.Contents {
c := output.Contents[i]
fi := &FileInfo{
name: *c.Key,
size: c.Size,
modTime: *c.LastModified,
}
f.entries[i] = fi
if dirInfo.modTime.IsZero() ||
dirInfo.modTime.Compare(fi.modTime) < 0 {
dirInfo.modTime = fi.modTime
}
}
// AWS S3 does not store the last modified time for the bucket
// anywhere. We'd have to iterate through all the objects to
// calculate it, which doesn't seem worth it.

return f.dirEntry, nil
}

// ReadDir implements fs.ReadDirFS. For the s3 filesystem, this
// returns the previously fetched objects in the bucket. This can only
// be called on the current directory as the s3 filesystem does not
// support any kind of recursive directory structure
// ReadDir implements fs.ReadDirFS. This can only be called on the
// current directory as the s3 filesystem does not support any kind of
// recursive directory structure
func (f *FS) ReadDir(name string) ([]fs.DirEntry, error) {
// ReadDir can only be called after Stat and only on the
// current directory, aka "." or the bucket
if (name != "." && name != f.bucket) || f.entries == nil {
// ReadDir can only be called on the current directory, aka
// "." or the bucket
if name != "." && name != f.bucket {
return nil, &fs.PathError{
Op: "ReadDir",
Path: name,
Err: fs.ErrInvalid,
}
}

return f.entries, nil
// If a prefix is provided, only list objects with that prefix
// This lets the user configure a portion of a bucket for
// feature flags, simulating a subdirectory.
//
// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
var prefix *string
if f.prefix != "" {
prefix = &f.prefix
}

// instead of caching the entries in Open, fetch them here so
// if the list is large, they are not stored on the FS object.
entries := []fs.DirEntry{}

// loop until all results are retrieved, but don't loop more
// than 100 times (creating 100,000 entries) as a safety
// measure to ensure we don't run out of memory and/or loop
// forever
var continuationToken *string
for i := 0; i < 100; i++ {
output, err := f.s3Client.ListObjectsV2(context.Background(),
&s3.ListObjectsV2Input{
Bucket: &f.bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
})
if err != nil {
return nil, err
}

for i := range output.Contents {
c := output.Contents[i]
fi := &FileInfo{
name: *c.Key,
size: c.Size,
modTime: *c.LastModified,
}
entries = append(entries, fi)
}
if !output.IsTruncated {
return entries, nil
}
continuationToken = output.NextContinuationToken
}

// We looped more than 100 times. Instead of silently
// truncating, return an error. Should we return a custom
// error?
return nil, &fs.PathError{
Op: "ReadDir",
Path: name,
Err: fs.ErrClosed,
}
}

type Dir struct {
Expand Down
Loading

0 comments on commit 7c71272

Please sign in to comment.