From d3269a214eeabb2744631d821edb15af99b64240 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Wed, 27 Mar 2024 11:30:14 -0700 Subject: [PATCH 1/2] Fix stateful expressions in UDFs This commit fixes an issue with using aggregation expressions user-defined functions where there wasn't a separate state per textual invocation. Closes #5092 --- compiler/kernel/expr.go | 29 +++++++++++++- compiler/kernel/op.go | 38 ++++++------------- .../sam/expr/ztests/udf-stateful-expr.yaml | 8 ++++ 3 files changed, 47 insertions(+), 28 deletions(-) create mode 100644 runtime/sam/expr/ztests/udf-stateful-expr.yaml diff --git a/compiler/kernel/expr.go b/compiler/kernel/expr.go index 0f5659f8af..17c6726afe 100644 --- a/compiler/kernel/expr.go +++ b/compiler/kernel/expr.go @@ -315,8 +315,13 @@ func (b *Builder) compileCall(call dag.Call) (expr.Evaluator, error) { var path field.Path // First check if call is to a user defined function, otherwise check for // builtin function. - fn, ok := b.funcs[call.Name] - if !ok { + var fn expr.Function + if e, ok := b.udfs[call.Name]; ok { + var err error + if fn, err = b.compileUDFCall(call.Name, e); err != nil { + return nil, err + } + } else { var err error fn, path, err = function.New(b.zctx(), call.Name, len(call.Args)) if err != nil { @@ -335,6 +340,26 @@ func (b *Builder) compileCall(call dag.Call) (expr.Evaluator, error) { return expr.NewCall(b.zctx(), fn, exprs), nil } +func (b *Builder) compileUDFCall(name string, body dag.Expr) (expr.Function, error) { + if b.udfStack == nil { + // If udfStack is nil this means we are entering a udf invocation. We + // will store compiled udf calls in builder.udfStack in order to avoid + // infinite recursion when encountering recursive udf calls. + b.udfStack = make(map[string]*expr.UDF) + defer func() { b.udfStack = nil }() + } + if fn, ok := b.udfStack[name]; ok { + return fn, nil + } + fn := &expr.UDF{} + b.udfStack[name] = fn + var err error + if fn.Body, err = b.compileExpr(body); err != nil { + return nil, err + } + return fn, nil +} + func (b *Builder) compileMapCall(a *dag.MapCall) (expr.Evaluator, error) { e, err := b.compileExpr(a.Expr) if err != nil { diff --git a/compiler/kernel/op.go b/compiler/kernel/op.go index 018210b503..2fe2dd1c0a 100644 --- a/compiler/kernel/op.go +++ b/compiler/kernel/op.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "maps" "slices" "strings" "sync" @@ -57,7 +58,8 @@ type Builder struct { progress *zbuf.Progress arena *zed.Arena // For zed.Values created during compilation. deletes *sync.Map - funcs map[string]expr.Function + udfs map[string]dag.Expr + udfStack map[string]*expr.UDF resetters expr.Resetters } @@ -75,7 +77,7 @@ func NewBuilder(rctx *runtime.Context, source *data.Source) *Builder { RecordsMatched: 0, }, arena: arena, - funcs: make(map[string]expr.Function), + udfs: make(map[string]dag.Expr), } } @@ -459,11 +461,14 @@ func (b *Builder) compileSeq(seq dag.Seq, parents []zbuf.Puller) ([]zbuf.Puller, } func (b *Builder) compileScope(scope *dag.Scope, parents []zbuf.Puller) ([]zbuf.Puller, error) { - // XXX We need to fix how udfs are compiled since there is currently a bug - // where aggregation expressions in udfs do not have separate state per - // invocation. - if err := b.compileFuncs(scope.Funcs); err != nil { - return nil, err + // Because there can be name collisions between a child and parent scope + // we clone the current udf map, populate the cloned map, then restore the + // old scope once the current scope has been built. + parentUDFs := b.udfs + b.udfs = maps.Clone(parentUDFs) + defer func() { b.udfs = parentUDFs }() + for _, f := range scope.Funcs { + b.udfs[f.Name] = f.Expr } return b.compileSeq(scope.Body, parents) } @@ -510,25 +515,6 @@ func (b *Builder) compileScatter(par *dag.Scatter, parents []zbuf.Puller) ([]zbu return ops, nil } -func (b *Builder) compileFuncs(fns []*dag.Func) error { - udfs := make([]*expr.UDF, 0, len(fns)) - for _, f := range fns { - if _, ok := b.funcs[f.Name]; ok { - return fmt.Errorf("internal error: func %q declared twice", f.Name) - } - u := &expr.UDF{} - b.funcs[f.Name] = u - udfs = append(udfs, u) - } - for i := range fns { - var err error - if udfs[i].Body, err = b.compileExpr(fns[i].Expr); err != nil { - return err - } - } - return nil -} - func (b *Builder) compileExprSwitch(swtch *dag.Switch, parents []zbuf.Puller) ([]zbuf.Puller, error) { parent := parents[0] if len(parents) > 1 { diff --git a/runtime/sam/expr/ztests/udf-stateful-expr.yaml b/runtime/sam/expr/ztests/udf-stateful-expr.yaml new file mode 100644 index 0000000000..aa8433c414 --- /dev/null +++ b/runtime/sam/expr/ztests/udf-stateful-expr.yaml @@ -0,0 +1,8 @@ +zed: | + func c(): ( count() ) + yield [c(),c(),c()] + +input: 'null' + +output: | + [1(uint64),1(uint64),1(uint64)] From a19f14a1cd336d2057630f764defc934fa0fe9ce Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Wed, 19 Jun 2024 10:43:48 -0700 Subject: [PATCH 2/2] Fixes --- compiler/kernel/expr.go | 14 ++++------- compiler/kernel/op.go | 25 ++++++++++--------- .../sam/expr/ztests/udf-stateful-expr.yaml | 7 +++--- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/compiler/kernel/expr.go b/compiler/kernel/expr.go index 17c6726afe..aa7c50b03b 100644 --- a/compiler/kernel/expr.go +++ b/compiler/kernel/expr.go @@ -341,22 +341,18 @@ func (b *Builder) compileCall(call dag.Call) (expr.Evaluator, error) { } func (b *Builder) compileUDFCall(name string, body dag.Expr) (expr.Function, error) { - if b.udfStack == nil { - // If udfStack is nil this means we are entering a udf invocation. We - // will store compiled udf calls in builder.udfStack in order to avoid - // infinite recursion when encountering recursive udf calls. - b.udfStack = make(map[string]*expr.UDF) - defer func() { b.udfStack = nil }() - } - if fn, ok := b.udfStack[name]; ok { + if fn, ok := b.compiledUDFs[name]; ok { return fn, nil } fn := &expr.UDF{} - b.udfStack[name] = fn + // We store compiled UDF calls here so as to avoid stack overflows on + // recursive calls. + b.compiledUDFs[name] = fn var err error if fn.Body, err = b.compileExpr(body); err != nil { return nil, err } + delete(b.compiledUDFs, name) return fn, nil } diff --git a/compiler/kernel/op.go b/compiler/kernel/op.go index 2fe2dd1c0a..313f7c3419 100644 --- a/compiler/kernel/op.go +++ b/compiler/kernel/op.go @@ -51,16 +51,16 @@ import ( var ErrJoinParents = errors.New("join requires two upstream parallel query paths") type Builder struct { - rctx *runtime.Context - mctx *zed.Context - source *data.Source - readers []zio.Reader - progress *zbuf.Progress - arena *zed.Arena // For zed.Values created during compilation. - deletes *sync.Map - udfs map[string]dag.Expr - udfStack map[string]*expr.UDF - resetters expr.Resetters + rctx *runtime.Context + mctx *zed.Context + source *data.Source + readers []zio.Reader + progress *zbuf.Progress + arena *zed.Arena // For zed.Values created during compilation. + deletes *sync.Map + udfs map[string]dag.Expr + compiledUDFs map[string]*expr.UDF + resetters expr.Resetters } func NewBuilder(rctx *runtime.Context, source *data.Source) *Builder { @@ -76,8 +76,9 @@ func NewBuilder(rctx *runtime.Context, source *data.Source) *Builder { RecordsRead: 0, RecordsMatched: 0, }, - arena: arena, - udfs: make(map[string]dag.Expr), + arena: arena, + udfs: make(map[string]dag.Expr), + compiledUDFs: make(map[string]*expr.UDF), } } diff --git a/runtime/sam/expr/ztests/udf-stateful-expr.yaml b/runtime/sam/expr/ztests/udf-stateful-expr.yaml index aa8433c414..b10cdd318b 100644 --- a/runtime/sam/expr/ztests/udf-stateful-expr.yaml +++ b/runtime/sam/expr/ztests/udf-stateful-expr.yaml @@ -1,8 +1,9 @@ zed: | - func c(): ( count() ) - yield [c(),c(),c()] + func c1(): ( count() ) + func c2(): ( c1()+c1() ) + yield [c2(),c2(),c2()] input: 'null' output: | - [1(uint64),1(uint64),1(uint64)] + [2(uint64),2(uint64),2(uint64)]