Skip to content

Commit

Permalink
feat: enhances S3Download to filter by traced table names (#1374)
Browse files Browse the repository at this point in the history
This PR updates the `S3Download` implementation to allow specifying the
names of the traced tables when downloading from an S3 bucket. This
enhancement enables file filtering and reduces the amount of data
downloaded. It is a desirable feature when interacting with the traced
data of large network tests. If no table name is provided, then
`S3Download` downloads all the traced tables.
  • Loading branch information
staheri14 committed Jun 6, 2024
1 parent 68d87ab commit 800924f
Showing 1 changed file with 44 additions and 28 deletions.
72 changes: 44 additions & 28 deletions pkg/trace/fileserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ func (lt *LocalTracer) PushAll() error {

// S3Download downloads files that match some prefix from an S3 bucket to a
// local directory dst.
func S3Download(dst, prefix string, cfg S3Config) error {
// fileNames is a list of traced jsonl file names to download. If it is empty, all traces are downloaded.
// fileNames should not have .jsonl suffix.
func S3Download(dst, prefix string, fileNames []string, cfg S3Config) error {
// Ensure local directory structure exists
err := os.MkdirAll(dst, os.ModePerm)
if err != nil {
Expand Down Expand Up @@ -288,37 +290,51 @@ func S3Download(dst, prefix string, cfg S3Config) error {

err = s3Svc.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, content := range page.Contents {
localFilePath := filepath.Join(dst, prefix, strings.TrimPrefix(*content.Key, prefix))
fmt.Printf("Downloading %s to %s\n", *content.Key, localFilePath)
key := *content.Key

// Create the directories in the path
if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil {
return false
// If no fileNames are specified, download all files
if len(fileNames) == 0 {
fileNames = append(fileNames, strings.TrimPrefix(key, prefix))
}

// Create a file to write the S3 Object contents to.
f, err := os.Create(localFilePath)
if err != nil {
return false
}

resp, err := s3Svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(cfg.BucketName),
Key: aws.String(*content.Key),
})
if err != nil {
f.Close()
continue
for _, filename := range fileNames {
// Add .jsonl suffix to the fileNames
fullFilename := filename + ".jsonl"
if strings.HasSuffix(key, fullFilename) {
localFilePath := filepath.Join(dst, prefix, strings.TrimPrefix(key, prefix))
fmt.Printf("Downloading %s to %s\n", key, localFilePath)

// Create the directories in the path
if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil {
return false
}

// Create a file to write the S3 Object contents to.
f, err := os.Create(localFilePath)
if err != nil {
return false
}

resp, err := s3Svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(cfg.BucketName),
Key: aws.String(key),
})
if err != nil {
f.Close()
continue
}
defer resp.Body.Close()

// Copy the contents of the S3 object to the local file
if _, err := io.Copy(f, resp.Body); err != nil {
f.Close()
return false
}

fmt.Printf("Successfully downloaded %s to %s\n", key, localFilePath)
f.Close()
}
}
defer resp.Body.Close()

// Copy the contents of the S3 object to the local file
if _, err := io.Copy(f, resp.Body); err != nil {
return false
}

fmt.Printf("Successfully downloaded %s to %s\n", *content.Key, localFilePath)
f.Close()
}
return !lastPage // continue paging
})
Expand Down

0 comments on commit 800924f

Please sign in to comment.