Skip to content

Commit

Permalink
fix: reapply this commit:
Browse files Browse the repository at this point in the history
Inject Task's LatestSuccess Timestamp In Flux Extern (#19402)

* feat(task): Inject latest success/failure into extern.

* chore(task/backend): Don't specify an extern if there are no statements.

* chore(task/executor): Don't apply the latest failure for now.

* chore(changelog): Add 19402 to changelog.

* chore(kit): Introduce feature flag for time injection.

* chore(task/executor): Guard injection into extern by feature flag.

* chore(task/executor): No need for this subtest pattern.

* chore(task/executor): Add tests for extern injection.
  • Loading branch information
brettbuddin authored and Christopher Wolff committed Sep 16, 2020
1 parent d8947b4 commit 7f4300a
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 14 deletions.
6 changes: 6 additions & 0 deletions flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,9 @@
key: enforceOrgDashboardLimits
default: false
contact: Compute Team

- name: Inject Latest Success Time
description: Inject the latest successful task run timestamp into a Task query extern when executing.
key: injectLatestSuccessTime
default: false
contact: Compute Team
16 changes: 16 additions & 0 deletions kit/feature/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions task/backend/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const (
maxPromises = 1000
defaultMaxWorkers = 100

lastSuccessOption = "tasks.lastSuccessTime"
latestSuccessOption = "tasks.latestSuccessTime"
latestFailureOption = "tasks.latestFailureTime"
)

var _ scheduler.Executor = (*Executor)(nil)
Expand Down Expand Up @@ -89,7 +90,7 @@ func (ts CompilerBuilderTimestamps) Extern() *ast.File {
if !ts.LatestSuccess.IsZero() {
body = append(body, &ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: lastSuccessOption},
ID: &ast.Identifier{Name: latestSuccessOption},
Init: &ast.DateTimeLiteral{
Value: ts.LatestSuccess,
},
Expand Down
2 changes: 1 addition & 1 deletion task/backend/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestTaskExecutor_QuerySuccessWithExternInjection(t *testing.T) {
extern := &ast.File{
Body: []ast.Statement{&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "tasks.lastSuccessTime"},
ID: &ast.Identifier{Name: latestSuccessOption},
Init: &ast.DateTimeLiteral{
Value: latestSuccess,
},
Expand Down
34 changes: 23 additions & 11 deletions task/backend/executor/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"time"

"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
Expand All @@ -30,14 +32,24 @@ type fakeQueryService struct {

var _ query.AsyncQueryService = (*fakeQueryService)(nil)

func makeAST(q string) lang.ASTCompiler {
pkg, err := flux.Parse(q)
func makeAST(q string, extern *ast.File) lang.ASTCompiler {
pkg, err := runtime.ParseToJSON(q)
if err != nil {
panic(err)
}

var externBytes []byte
if extern != nil && len(extern.Body) > 0 {
var err error
externBytes, err = json.Marshal(extern)
if err != nil {
panic(err)
}
}
return lang.ASTCompiler{
AST: pkg,
Now: time.Unix(123, 0),
AST: pkg,
Now: time.Unix(123, 0),
Extern: externBytes,
}
}

Expand Down Expand Up @@ -84,12 +96,12 @@ func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux.
}

// SucceedQuery allows the running query matching the given script to return on its Ready channel.
func (s *fakeQueryService) SucceedQuery(script string) {
func (s *fakeQueryService) SucceedQuery(script string, extern *ast.File) {
s.mu.Lock()
defer s.mu.Unlock()

// Unblock the flux.
ast := makeAST(script)
ast := makeAST(script, extern)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
Expand All @@ -102,12 +114,12 @@ func (s *fakeQueryService) SucceedQuery(script string) {
}

// FailQuery closes the running query's Ready channel and sets its error to the given value.
func (s *fakeQueryService) FailQuery(script string, forced error) {
func (s *fakeQueryService) FailQuery(script string, extern *ast.File, forced error) {
s.mu.Lock()
defer s.mu.Unlock()

// Unblock the flux.
ast := makeAST(script)
ast := makeAST(script, nil)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
Expand All @@ -128,12 +140,12 @@ func (s *fakeQueryService) FailNextQuery(forced error) {
// WaitForQueryLive ensures that the query has made it into the service.
// This is particularly useful for the synchronous executor,
// because the execution starts on a separate goroutine.
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string, extern *ast.File) {
t.Helper()

const attempts = 10
ast := makeAST(script)
astUTC := makeAST(script)
ast := makeAST(script, extern)
astUTC := makeAST(script, extern)
astUTC.Now = ast.Now.UTC()
spec := makeASTString(ast)
specUTC := makeASTString(astUTC)
Expand Down

0 comments on commit 7f4300a

Please sign in to comment.