Skip to content

Commit

Permalink
[BEAM-10976] Bundle finalization: Harness and some exec changes (#16980)
Browse files Browse the repository at this point in the history
* Bundle finalization harness side changes

* Add testing

* Iterate over pardos directly

* Track bundlefinalizer in plan.go not pardo

* Remove outdated test

* Fix pointer issue

* Update todos to reference jiras

* Cleanup from feedback

* Doc nit

Co-authored-by: Daniel Oliveira <younghoono@gmail.com>

* GetExpirationTime comment

Co-authored-by: github-actions <github-actions@github.com>
Co-authored-by: Daniel Oliveira <younghoono@gmail.com>
  • Loading branch information
3 people authored Mar 8, 2022
1 parent f0e14fb commit 95a5c26
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 43 deletions.
12 changes: 6 additions & 6 deletions sdks/go/pkg/beam/core/runtime/exec/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (n *Combine) Up(ctx context.Context) error {

n.states = metrics.NewPTransformState(n.PID)

if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil, nil); err != nil {
return n.fail(err)
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte
}

in := &MainInput{Key: FullValue{Elm: a}}
val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, b)
val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, nil, b)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking MergeAccumulators"))
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (n *Combine) Down(ctx context.Context) error {
}
n.status = Down

if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil {
n.err.TrySetError(err)
}
return n.err.Error()
Expand All @@ -230,7 +230,7 @@ func (n *Combine) newAccum(ctx context.Context, key interface{}) (interface{}, e
opt = &MainInput{Key: FullValue{Elm: key}}
}

val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt)
val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt, nil)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking CreateAccumulator"))
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key, value interface{}, t
}
v := n.aiValConvert(value)

val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, v)
val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, nil, v)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking AddInput"))
}
Expand All @@ -287,7 +287,7 @@ func (n *Combine) extract(ctx context.Context, accum interface{}) (interface{},
return accum, nil
}

val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, accum)
val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, nil, accum)
if err != nil {
return nil, n.fail(errors.WithContext(err, "invoking ExtractOutput"))
}
Expand Down
53 changes: 43 additions & 10 deletions sdks/go/pkg/beam/core/runtime/exec/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
Expand All @@ -39,23 +40,47 @@ type MainInput struct {
RTracker sdf.RTracker
}

type bundleFinalizationCallback struct {
callback func() error
validUntil time.Time
}

// bundleFinalizer holds all the user defined callbacks to be run on bundle finalization.
// Implements typex.BundleFinalization
type bundleFinalizer struct {
callbacks []bundleFinalizationCallback
lastValidCallback time.Time // Used to track when we can safely gc the bundleFinalizer
}

// RegisterCallback is used to register callbacks during DoFn execution.
func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
callback := bundleFinalizationCallback{
callback: cb,
validUntil: time.Now().Add(t),
}
bf.callbacks = append(bf.callbacks, callback)
if bf.lastValidCallback.Before(callback.validUntil) {
bf.lastValidCallback = callback.validUntil
}
}

// Invoke invokes the fn with the given values. The extra values must match the non-main
// side input and emitters. It returns the direct output, if any.
func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
inv := newInvoker(fn)
return inv.Invoke(ctx, pn, ws, ts, opt, extra...)
return inv.Invoke(ctx, pn, ws, ts, opt, bf, extra...)
}

// InvokeWithoutEventTime runs the given function at time 0 in the global window.
func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
inv := newInvoker(fn)
return inv.InvokeWithoutEventTime(ctx, opt, extra...)
return inv.InvokeWithoutEventTime(ctx, opt, bf, extra...)
}

// invoker is a container struct for hot path invocations of DoFns, to avoid
Expand All @@ -64,9 +89,9 @@ type invoker struct {
fn *funcx.Fn
args []interface{}
// TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement.
ctxIdx, pnIdx, wndIdx, etIdx int // specialized input indexes
outEtIdx, outErrIdx int // specialized output indexes
in, out []int // general indexes
ctxIdx, pnIdx, wndIdx, etIdx, bfIdx int // specialized input indexes
outEtIdx, outErrIdx int // specialized output indexes
in, out []int // general indexes

ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue.
elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types.
Expand Down Expand Up @@ -99,6 +124,11 @@ func newInvoker(fn *funcx.Fn) *invoker {
if n.outErrIdx, ok = fn.Error(); !ok {
n.outErrIdx = -1
}
// TODO(BEAM-10976) - add this back in once BundleFinalization is implemented
// if n.bfIdx, ok = fn.BundleFinalization(); !ok {
// n.bfIdx = -1
// }
n.bfIdx = -1

n.initCall()

Expand All @@ -115,13 +145,13 @@ func (n *invoker) Reset() {
}

// InvokeWithoutEventTime runs the function at time 0 in the global window.
func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, extra ...interface{}) (*FullValue, error) {
return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, extra...)
func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, bf, extra...)
}

// Invoke invokes the fn with the given values. The extra values must match the non-main
// side input and emitters. It returns the direct output, if any.
func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) {
func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) {
// (1) Populate contexts
// extract these to make things easier to read.
args := n.args
Expand All @@ -143,6 +173,9 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
if n.etIdx >= 0 {
args[n.etIdx] = ts
}
if n.bfIdx >= 0 {
args[n.bfIdx] = bf
}

// (2) Main input from value, if any.
i := 0
Expand Down
62 changes: 55 additions & 7 deletions sdks/go/pkg/beam/core/runtime/exec/fn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package exec

import (
"context"
"errors"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestInvoke(t *testing.T) {
test.ExpectedTime = ts
}

val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...)
if err != nil {
t.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err)
}
Expand All @@ -195,6 +196,53 @@ func TestInvoke(t *testing.T) {
}
}

func TestRegisterCallback(t *testing.T) {
bf := bundleFinalizer{
callbacks: []bundleFinalizationCallback{},
lastValidCallback: time.Now(),
}
testVar := 0
bf.RegisterCallback(500*time.Minute, func() error {
testVar += 5
return nil
})
bf.RegisterCallback(2*time.Minute, func() error {
testVar = 25
return nil
})
callbackErr := errors.New("Callback error")
bf.RegisterCallback(2*time.Minute, func() error {
return callbackErr
})

// We can't do exact equality since this relies on real time, we'll give it a broad range
if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) || bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback)
}
if got, want := len(bf.callbacks), 3; got != want {
t.Fatalf("Callbacks in bundleFinalizer does not match number of calls to RegisterCallback(), got %v callbacks, want %v", got, want)
}

callbackIdx := 0
if err := bf.callbacks[callbackIdx].callback(); err != nil {
t.Errorf("RegisterCallback() callback at index %v returned unexpected error: %v", callbackIdx, err)
}
if got, want := testVar, 5; got != want {
t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want)
}
callbackIdx = 1
if err := bf.callbacks[callbackIdx].callback(); err != nil {
t.Errorf("RegisterCallback() callback at index %v returned error %v, want nil", callbackIdx, err)
}
if got, want := testVar, 25; got != want {
t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want)
}
callbackIdx = 2
if err := bf.callbacks[2].callback(); err != callbackErr {
t.Errorf("RegisterCallback() callback at index %v returned error %v, want %v", callbackIdx, err, callbackErr)
}
}

// Benchmarks

// Invoke is implemented as a single use of a cached invoker, so a measure of
Expand Down Expand Up @@ -314,7 +362,7 @@ func BenchmarkInvoke(b *testing.B) {
ts := mtime.ZeroTimestamp.Add(2 * time.Millisecond)
b.Run(fmt.Sprintf("SingleInvoker_%s", test.Name), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
_, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...)
if err != nil {
b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err)
}
Expand All @@ -323,7 +371,7 @@ func BenchmarkInvoke(b *testing.B) {
b.Run(fmt.Sprintf("CachedInvoker_%s", test.Name), func(b *testing.B) {
inv := newInvoker(fn)
for i := 0; i < b.N; i++ {
_, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, test.Args...)
_, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, nil, test.Args...)
if err != nil {
b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err)
}
Expand Down Expand Up @@ -416,7 +464,7 @@ func BenchmarkInvokeCall(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}})
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil)
n = ret.Elm.(int)
}
b.Log(n)
Expand All @@ -427,7 +475,7 @@ func BenchmarkInvokeCallExtra(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n)
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n)
n = ret.Elm.(int)
}
b.Log(n)
Expand All @@ -453,7 +501,7 @@ func BenchmarkInvokeFnCall(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}})
ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil)
n = ret.Elm.(int)
}
b.Log(n)
Expand All @@ -464,7 +512,7 @@ func BenchmarkInvokeFnCallExtra(b *testing.B) {
ctx := context.Background()
n := 0
for i := 0; i < b.N; i++ {
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n)
ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n)
n = ret.Elm.(int)
}
b.Log(n)
Expand Down
9 changes: 5 additions & 4 deletions sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ParDo struct {
emitters []ReusableEmitter
ctx context.Context
inv *invoker
bf *bundleFinalizer

reader StateReader
cache *cacheElm
Expand Down Expand Up @@ -83,7 +84,7 @@ func (n *ParDo) Up(ctx context.Context) error {
// Subsequent bundles might run this same node, and the context here would be
// incorrectly refering to the older bundleId.
setupCtx := metrics.SetPTransformID(ctx, n.PID)
if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil, nil); err != nil {
return n.fail(err)
}

Expand Down Expand Up @@ -229,7 +230,7 @@ func (n *ParDo) Down(ctx context.Context) error {
n.reader = nil
n.cache = nil

if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil {
if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil {
n.err.TrySetError(err)
}
return n.err.Error()
Expand Down Expand Up @@ -295,7 +296,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex.
if err := n.preInvoke(ctx, ws, ts); err != nil {
return nil, err
}
val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.cache.extra...)
val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.bf, n.cache.extra...)
if err != nil {
return nil, err
}
Expand All @@ -313,7 +314,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ
if err := n.preInvoke(ctx, ws, ts); err != nil {
return nil, err
}
val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.cache.extra...)
val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.bf, n.cache.extra...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 95a5c26

Please sign in to comment.