From 10252ebd0d39f88ad1ef427c01661385a542913f Mon Sep 17 00:00:00 2001 From: Erick Fejta Date: Fri, 6 Mar 2020 00:14:32 -0800 Subject: [PATCH] Set a ceiling on the number of extra files to attach. Otherwise the server will reject invocations describing large jobs (which have tens of thousands of files). --- experiment/resultstore/convert.go | 20 ++- experiment/resultstore/convert_test.go | 193 ++++++++++++++++++++++++- experiment/resultstore/main.go | 144 +++++++++--------- experiment/resultstore/main_test.go | 40 +---- 4 files changed, 293 insertions(+), 104 deletions(-) diff --git a/experiment/resultstore/convert.go b/experiment/resultstore/convert.go index e6256b3b2d93..06e04b4e1e16 100644 --- a/experiment/resultstore/convert.go +++ b/experiment/resultstore/convert.go @@ -118,7 +118,7 @@ func convertSuiteMeta(suiteMeta gcs.SuitesMeta) resultstore.Suite { } // Convert converts build metadata stored in gcp into the corresponding ResultStore Invocation, Target and Test. -func convert(project, details string, url gcs.Path, result downloadResult) (resultstore.Invocation, resultstore.Target, resultstore.Test) { +func convert(project, details string, url gcs.Path, result downloadResult, maxFiles int) (resultstore.Invocation, resultstore.Target, resultstore.Test) { started := result.started finished := result.finished artifacts := result.artifactURLs @@ -175,13 +175,18 @@ func convert(project, details string, url gcs.Path, result downloadResult) (resu artifacts[i] = "gs://" + bucket + "/" + a } + var total int for _, a := range artifacts { // add started.json, etc to the invocation artifact list. + if total >= maxFiles { + continue + } if strings.HasPrefix(a, artifactsPath) { continue // things under artifacts/ are owned by the test } if a == buildLog { continue // Handle this in InvocationLog } + total++ inv.Files = append(inv.Files, resultstore.File{ ID: uniqPath(a), ContentType: "text/plain", @@ -237,6 +242,9 @@ func convert(project, details string, url gcs.Path, result downloadResult) (resu } for _, a := range artifacts { + if total >= maxFiles { + continue + } if !strings.HasPrefix(a, artifactsPath) { continue // Non-artifacts (started.json, etc) are owned by the invocation } @@ -254,6 +262,7 @@ func convert(project, details string, url gcs.Path, result downloadResult) (resu if found { continue } + total++ test.Suite.Files = append(test.Suite.Files, resultstore.File{ ID: uniqPath(a), ContentType: "text/plain", @@ -261,6 +270,15 @@ func convert(project, details string, url gcs.Path, result downloadResult) (resu }) } + if total >= maxFiles { + // TODO(fejta): expose this to edge case to user in a better way + inv.Files = append(inv.Files, resultstore.File{ + ID: fmt.Sprintf("exceeded %d files", maxFiles), + ContentType: "text/plain", + URL: basePath, + }) + } + test.Suite.Start = inv.Start test.Action.Start = inv.Start test.Suite.Duration = inv.Duration diff --git a/experiment/resultstore/convert_test.go b/experiment/resultstore/convert_test.go index afee14aa01fa..1d99777dd994 100644 --- a/experiment/resultstore/convert_test.go +++ b/experiment/resultstore/convert_test.go @@ -125,6 +125,7 @@ func TestConvertProjectMetadataToResultStoreArtifacts(t *testing.T) { details string url string result downloadResult + maxFiles int expectedInvocation resultstore.Invocation expectedTarget resultstore.Target expectedTest resultstore.Test @@ -396,6 +397,193 @@ func TestConvertProjectMetadataToResultStoreArtifacts(t *testing.T) { }, }, }, + { + name: "Reject excessive artifacts", + maxFiles: 3, + project: "projectX", + details: "detailY", + url: "gs://bucket/logs/jobA/1234567890123456789", + result: downloadResult{ + started: gcs.Started{ + Started: metadata.Started{ + Timestamp: 1234567890, + Repos: map[string]string{ + "org/repoA": "branchB", + }, + DeprecatedRepoVersion: "aadb2b88d190a38b59f512b4d8c508a88cf839e1", + }, + Pending: false, + }, + finished: gcs.Finished{ + Finished: metadata.Finished{ + Result: "SUCCESS", + DeprecatedRevision: "master", + }, + Running: false, + }, + artifactURLs: []string{ + "logs/jobA/1234567890123456789/artifacts/junit_runner.xml", + "logs/jobA/1234567890123456789/artifacts/1", + "logs/jobA/1234567890123456789/artifacts/2", + "logs/jobA/1234567890123456789/artifacts/3", + "logs/jobA/1234567890123456789/artifacts/4", + "logs/jobA/1234567890123456789/artifacts/5", + "logs/jobA/1234567890123456789/build-log.txt", + }, + suiteMetas: []gcs.SuitesMeta{ + { + Suites: junit.Suites{ + XMLName: xml.Name{}, + Suites: []junit.Suite{ + { + XMLName: xml.Name{Space: "testsuite", Local: ""}, + Time: 10.5, + Failures: 0, + Tests: 2, + Results: []junit.Result{ + { + Name: "Result1", + Time: 3.2, + ClassName: "test1", + Properties: &junit.Properties{ + PropertyList: []junit.Property{ + {Name: "p1", Value: "v1"}, + }, + }, + }, + { + Name: "Result2", + Time: 7.3, + ClassName: "test2", + Properties: &junit.Properties{ + PropertyList: []junit.Property{ + {Name: "p2", Value: "v2"}, + }, + }, + }, + }, + }, + }, + }, + Metadata: map[string]string{ + "Context": "runner", + }, + Path: "gs://bucket/logs/jobA/1234567890123456789/artifacts/junit_runner.xml", + }, + }, + }, + expectedInvocation: resultstore.Invocation{ + Project: "projectX", + Details: "detailY", + Files: []resultstore.File{ + { + ID: resultstore.InvocationLog, + ContentType: "text/plain", + URL: "gs://bucket/logs/jobA/1234567890123456789/build-log.txt", + }, + { + ID: "exceeded 3 files", + ContentType: "text/plain", + URL: "gs://bucket/logs/jobA/1234567890123456789/", + }, + }, + Properties: []resultstore.Property{ + {Key: "Job", Value: "jobA"}, + {Key: "Pull", Value: ""}, + {Key: "Org", Value: "org"}, + {Key: "Branch", Value: "branchB"}, + {Key: "Repo", Value: "repoA"}, + {Key: "Repo", Value: "org/repoA"}, + {Key: "Repo", Value: "org/repoA:branchB"}, + }, + Start: time.Unix(1234567890, 0), + Status: resultstore.Running, + Description: "In progress...", + }, + expectedTarget: resultstore.Target{ + Status: resultstore.Running, + Description: "In progress...", + Start: time.Unix(1234567890, 0), + Properties: []resultstore.Property{ + {Key: "Result1:p1", Value: "v1"}, + {Key: "Result2:p2", Value: "v2"}, + }, + }, + expectedTest: resultstore.Test{ + Suite: resultstore.Suite{ + Name: "test", + Start: time.Unix(1234567890, 0), + Files: []resultstore.File{ + { + ID: resultstore.TargetLog, + ContentType: "text/plain", + URL: "gs://bucket/logs/jobA/1234567890123456789/build-log.txt", + }, + { + ID: "artifacts/junit_runner.xml", + ContentType: "text/xml", + URL: "gs://bucket/logs/jobA/1234567890123456789/artifacts/junit_runner.xml", + }, + { + ID: "artifacts/1", + ContentType: "text/plain", + URL: "gs://bucket/logs/jobA/1234567890123456789/artifacts/1", + }, + { + ID: "artifacts/2", + ContentType: "text/plain", + URL: "gs://bucket/logs/jobA/1234567890123456789/artifacts/2", + }, + { + ID: "artifacts/3", + ContentType: "text/plain", + URL: "gs://bucket/logs/jobA/1234567890123456789/artifacts/3", + }, + }, + Suites: []resultstore.Suite{ + { + Name: "junit_runner.xml", + Duration: dur(10.5), + Files: []resultstore.File{ + { + ID: "junit_runner.xml", + ContentType: "text/xml", + URL: "gs://bucket/logs/jobA/1234567890123456789/artifacts/junit_runner.xml", + }, + }, + Suites: []resultstore.Suite{ + { + Cases: []resultstore.Case{ + { + Name: "Result1", + Class: "test1", + Result: resultstore.Completed, + Duration: dur(3.2), + }, + { + Name: "Result2", + Class: "test2", + Result: resultstore.Completed, + Duration: dur(7.3), + }, + }, + Duration: dur(10.5), + Properties: []resultstore.Property{ + {Key: "Result1:p1", Value: "v1"}, + {Key: "Result2:p2", Value: "v2"}, + }, + }, + }, + }, + }, + }, + Action: resultstore.Action{ + Start: time.Unix(1234567890, 0), + Status: resultstore.Running, + Description: "In progress...", + }, + }, + }, { name: "Convert full project metadata", project: "projectX", @@ -566,7 +754,10 @@ func TestConvertProjectMetadataToResultStoreArtifacts(t *testing.T) { if err != nil { t.Errorf("incorrect url: %v", err) } - invocation, target, test := convert(tc.project, tc.details, *urlPath, tc.result) + if tc.maxFiles == 0 { + tc.maxFiles = 40000 + } + invocation, target, test := convert(tc.project, tc.details, *urlPath, tc.result, tc.maxFiles) if diff := cmp.Diff(invocation, tc.expectedInvocation, cmpOption); diff != "" { t.Errorf("%s:%s mismatch (-got +want):\n%s", tc.name, "invocation", diff) } diff --git a/experiment/resultstore/main.go b/experiment/resultstore/main.go index ba4c189c5aea..3ab834af3c41 100644 --- a/experiment/resultstore/main.go +++ b/experiment/resultstore/main.go @@ -62,19 +62,20 @@ func stripTags(str string) (string, []string) { } type options struct { - path gcs.Path - jobs flagutil.Strings + account string buckets flagutil.Strings - deadline time.Duration + gcsAuth bool + jobs flagutil.Strings latest int override bool - account string - gcsAuth bool + path gcs.Path pending bool - repeat time.Duration project string + repeat time.Duration secret string testgridConfig string + timeout time.Duration + maxFiles int } func (o *options) parse(flags *flag.FlagSet, args []string) error { @@ -89,8 +90,9 @@ func (o *options) parse(flags *flag.FlagSet, args []string) error { flags.BoolVar(&o.pending, "pending", false, "Include pending results when set (otherwise ignore them)") flags.StringVar(&o.project, "upload", "", "Upload results to specified gcp project instead of stdout") flags.StringVar(&o.secret, "secret", "", "Use the specified secret guid instead of randomly generating one.") - flags.DurationVar(&o.deadline, "deadline", 0, "Timeout after the specified deadling duration (use 0 for no deadline)") + flags.DurationVar(&o.timeout, "timeout", 0, "Timeout after the specified deadling duration (use 0 for no timeout)") flags.DurationVar(&o.repeat, "repeat", 0, "Repeatedly transfer after sleeping for this duration (exit after one run when 0)") + flags.IntVar(&o.maxFiles, "max-files", 10000, "Ceiling for number of artifact files (0 for unlimited, server may reject)") flags.Parse(args) return nil } @@ -146,11 +148,25 @@ func trailingSlash(s string) string { return s + "/" } +func bucketListChecker(buckets ...string) (bucketChecker, error) { + bucketNames := map[string]bool{} + for _, b := range buckets { + var path gcs.Path + if err := path.Set(b); err != nil { + return nil, fmt.Errorf("%q: %w", b, err) + } + bucketNames[path.Bucket()] = true + } + return func(_ context.Context, name string) bool { + return bucketNames[name] + }, nil +} + func run(opt options) error { var ctx context.Context var cancel context.CancelFunc - if opt.deadline > 0 { - ctx, cancel = context.WithTimeout(context.Background(), opt.deadline) + if opt.timeout > 0 { + ctx, cancel = context.WithTimeout(context.Background(), opt.timeout) } else { ctx, cancel = context.WithCancel(context.Background()) } @@ -181,7 +197,7 @@ func run(opt options) error { // Should we just transfer a specific build? if opt.path.Bucket() != "" { // All valid --build=gs://whatever values have a non-empty bucket. - return transferBuild(ctx, storageClient, rsClient, opt.project, opt.path, opt.override, true) + return transferBuild(ctx, storageClient, rsClient, opt.project, opt.path, opt.override, true, opt.maxFiles) } groups, err := findGroups(cfg, opt.jobs.Strings()...) @@ -189,10 +205,30 @@ func run(opt options) error { return fmt.Errorf("find groups: %v", err) } - groups, err = filterBuckets(groups, opt.buckets.Strings()...) + var checkBucket bucketChecker + if len(opt.buckets.Strings()) > 0 { + var err error + if checkBucket, err = bucketListChecker(opt.buckets.String()); err != nil { + return fmt.Errorf("parse bucket list: %w", err) + } + } else { + checkWritable := func(ctx context.Context, name string) bool { + const want = "storage.objects.create" + have, err := storageClient.Bucket(name).IAM().TestPermissions(ctx, []string{want}) + if err != nil || len(have) != 1 || have[0] != want { + logrus.WithError(err).WithFields(logrus.Fields{"bucket": name, "want": want, "have": have}).Error("No write access") + return false + } + return true + } + checkBucket = checkWritable + } + + groups, err = filterBuckets(ctx, checkBucket, groups...) logrus.Infof("Finding latest builds for %d groups...\n", len(groups)) - buildsChan, buildsErrChan := findBuilds(ctx, storageClient, groups) + prefilteredBuckets := len(opt.buckets.Strings()) > 0 + buildsChan, buildsErrChan := findBuilds(ctx, storageClient, groups, prefilteredBuckets) transferErrChan := transfer(ctx, storageClient, rsClient, opt, buildsChan) select { case <-ctx.Done(): @@ -213,25 +249,34 @@ func run(opt options) error { return nil } -func filterBuckets(groups []configpb.TestGroup, bucketPaths ...string) ([]configpb.TestGroup, error) { - if len(bucketPaths) == 0 { - return groups, nil - } +type bucketChecker func(context.Context, string) bool + +func filterBuckets(parent context.Context, checkBucket bucketChecker, groups ...configpb.TestGroup) ([]configpb.TestGroup, error) { buckets := map[string]bool{} - var path gcs.Path - for _, p := range bucketPaths { - if err := path.Set(p); err != nil { - return nil, fmt.Errorf("bad bucket: %w", err) + valid := func(path gcs.Path) bool { + name := path.Bucket() + if good, ok := buckets[name]; ok { + return good } - buckets[path.Bucket()] = true + + ctx, cancel := context.WithTimeout(parent, 10*time.Second) + defer cancel() + result := checkBucket(ctx, name) + buckets[name] = result + return result } var ret []configpb.TestGroup + var path gcs.Path for _, g := range groups { if err := path.Set("gs://" + g.GcsPrefix); err != nil { return nil, fmt.Errorf("bad group prefix %s: %w", g.Name, err) } - if !buckets[path.Bucket()] { + if !valid(path) { + logrus.WithFields(logrus.Fields{ + "testgroup": g.Name, + "gcs_prefix": "gs://" + g.GcsPrefix, + }).Info("Skip unwritable group") continue } ret = append(ret, g) @@ -258,7 +303,7 @@ func transfer(ctx context.Context, storageClient *storage.Client, rsClient *resu wg.Add(1) go func(info buildsInfo) { defer wg.Done() - if err := transferLatest(ctx, storageClient, rsClient, opt.project, info.builds, opt.latest, opt.override, opt.pending); err != nil { + if err := transferLatest(ctx, storageClient, rsClient, opt.project, info.builds, opt.latest, opt.override, opt.pending, opt.maxFiles); err != nil { logrus.WithError(err).Error("Transfer failed") select { case <-ctx.Done(): @@ -288,37 +333,6 @@ func transfer(ctx context.Context, storageClient *storage.Client, rsClient *resu return retChan } -type bucketChecker struct { - buckets map[string]bool - client *storage.Client - lock sync.RWMutex -} - -func (bc *bucketChecker) writable(ctx context.Context, path gcs.Path) bool { - name := path.Bucket() - bc.lock.RLock() - writable, present := bc.buckets[name] - bc.lock.RUnlock() - if present { - return writable - } - bc.lock.Lock() - defer bc.lock.Unlock() - writable, present = bc.buckets[name] - if present { - return writable - } - const want = "storage.objects.create" - have, err := bc.client.Bucket(name).IAM().TestPermissions(ctx, []string{want}) - if err != nil || len(have) != 1 || have[0] != want { - bc.buckets[name] = false - logrus.WithError(err).WithFields(logrus.Fields{"bucket": name, "want": want, "have": have}).Error("No write access") - } else { - bc.buckets[name] = true - } - return bc.buckets[name] -} - func findGroups(cfg *configpb.Configuration, jobs ...string) ([]configpb.TestGroup, error) { var groups []configpb.TestGroup for _, job := range jobs { @@ -342,7 +356,7 @@ type buildsInfo struct { builds []gcs.Build } -func findGroupBuilds(ctx context.Context, storageClient *storage.Client, bc *bucketChecker, group configpb.TestGroup, buildsChan chan<- buildsInfo, errChan chan<- error) { +func findGroupBuilds(ctx context.Context, storageClient *storage.Client, group configpb.TestGroup, buildsChan chan<- buildsInfo, errChan chan<- error) { log := logrus.WithFields(logrus.Fields{ "testgroup": group.Name, "gcs_prefix": "gs://" + group.GcsPrefix, @@ -358,10 +372,6 @@ func findGroupBuilds(ctx context.Context, storageClient *storage.Client, bc *buc } return } - if !bc.writable(ctx, *tgPath) { - log.Debug("Skip unwritable group") - return - } builds, err := gcs.ListBuilds(ctx, storageClient, *tgPath) if err != nil { @@ -384,13 +394,9 @@ func findGroupBuilds(ctx context.Context, storageClient *storage.Client, bc *buc } } -func findBuilds(ctx context.Context, storageClient *storage.Client, groups []configpb.TestGroup) (<-chan buildsInfo, <-chan error) { +func findBuilds(ctx context.Context, storageClient *storage.Client, groups []configpb.TestGroup, prefilteredBuckets bool) (<-chan buildsInfo, <-chan error) { buildsChan := make(chan buildsInfo) errChan := make(chan error) - bc := bucketChecker{ - buckets: map[string]bool{}, - client: storageClient, - } go func() { innerErrChan := make(chan error) defer close(buildsChan) @@ -403,7 +409,7 @@ func findBuilds(ctx context.Context, storageClient *storage.Client, groups []con wg.Add(1) go func(testGroup configpb.TestGroup) { defer wg.Done() - findGroupBuilds(ctx, storageClient, &bc, testGroup, buildsChan, innerErrChan) + findGroupBuilds(ctx, storageClient, testGroup, buildsChan, innerErrChan) }(testGroup) } go func() { @@ -428,7 +434,7 @@ func findBuilds(ctx context.Context, storageClient *storage.Client, groups []con return buildsChan, errChan } -func transferLatest(ctx context.Context, storageClient *storage.Client, rsClient *resultstore.Client, project string, builds gcs.Builds, max int, override bool, includePending bool) error { +func transferLatest(ctx context.Context, storageClient *storage.Client, rsClient *resultstore.Client, project string, builds gcs.Builds, max int, override bool, includePending bool, maxFiles int) error { for i, build := range builds { if i >= max { @@ -438,14 +444,14 @@ func transferLatest(ctx context.Context, storageClient *storage.Client, rsClient if err != nil { return fmt.Errorf("bad %s path: %v", build, err) } - if err := transferBuild(ctx, storageClient, rsClient, project, *path, override, includePending); err != nil { + if err := transferBuild(ctx, storageClient, rsClient, project, *path, override, includePending, maxFiles); err != nil { return fmt.Errorf("%s: %v", build, err) } } return nil } -func transferBuild(ctx context.Context, storageClient *storage.Client, rsClient *resultstore.Client, project string, path gcs.Path, override bool, includePending bool) error { +func transferBuild(ctx context.Context, storageClient *storage.Client, rsClient *resultstore.Client, project string, path gcs.Path, override bool, includePending bool, maxFiles int) error { build := gcs.Build{ Bucket: storageClient.Bucket(path.Bucket()), Prefix: trailingSlash(path.Object()), @@ -478,7 +484,7 @@ func transferBuild(ctx context.Context, storageClient *storage.Client, rsClient desc := "Results of " + path.String() log.Debug("Converting...") - inv, target, test := convert(project, desc, path, *result) + inv, target, test := convert(project, desc, path, *result, maxFiles) if project == "" { print(inv.To(), test.To()) diff --git a/experiment/resultstore/main_test.go b/experiment/resultstore/main_test.go index b3f9d8cfa2b0..477865e6e1bf 100644 --- a/experiment/resultstore/main_test.go +++ b/experiment/resultstore/main_test.go @@ -18,6 +18,7 @@ limitations under the License. package main import ( + "context" "reflect" "testing" @@ -161,38 +162,6 @@ func TestFilterBuckets(t *testing.T) { expected []configpb.TestGroup err bool }{ - { - name: "no paths works", - groups: []configpb.TestGroup{ - { - Name: "foo", - }, - { - Name: "bar", - }, - }, - expected: []configpb.TestGroup{ - { - Name: "foo", - }, - { - Name: "bar", - }, - }, - }, - { - name: "bad paths error", - groups: []configpb.TestGroup{ - { - Name: "foo", - }, - { - Name: "bar", - }, - }, - paths: []string{"!!!://!!!"}, - err: true, - }, { name: "bad groups error", groups: []configpb.TestGroup{ @@ -231,7 +200,12 @@ func TestFilterBuckets(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - actual, err := filterBuckets(tc.groups, tc.paths...) + + checkBuckets, err := bucketListChecker(tc.paths...) + if err != nil { + t.Fatalf("create checker: %v", err) + } + actual, err := filterBuckets(context.Background(), checkBuckets, tc.groups...) switch { case err != nil: if !tc.err {