From 95a5c26c4c6d0a1c4155ad209b61e623781c47df Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 8 Mar 2022 14:35:41 -0500 Subject: [PATCH] [BEAM-10976] Bundle finalization: Harness and some exec changes (#16980) * Bundle finalization harness side changes * Add testing * Iterate over pardos directly * Track bundlefinalizer in plan.go not pardo * Remove outdated test * Fix pointer issue * Update todos to reference jiras * Cleanup from feedback * Doc nit Co-authored-by: Daniel Oliveira * GetExpirationTime comment Co-authored-by: github-actions Co-authored-by: Daniel Oliveira --- sdks/go/pkg/beam/core/runtime/exec/combine.go | 12 +-- sdks/go/pkg/beam/core/runtime/exec/fn.go | 53 +++++++++-- sdks/go/pkg/beam/core/runtime/exec/fn_test.go | 62 +++++++++++-- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 9 +- sdks/go/pkg/beam/core/runtime/exec/plan.go | 50 ++++++++++ sdks/go/pkg/beam/core/runtime/exec/util.go | 2 +- .../pkg/beam/core/runtime/harness/harness.go | 92 ++++++++++++++++--- sdks/go/pkg/beam/runners/direct/buffer.go | 1 - 8 files changed, 238 insertions(+), 43 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go index 07dbb2f9a84dc..692f3f4647102 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/combine.go +++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go @@ -74,7 +74,7 @@ func (n *Combine) Up(ctx context.Context) error { n.states = metrics.NewPTransformState(n.PID) - if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil, nil); err != nil { return n.fail(err) } @@ -107,7 +107,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte } in := &MainInput{Key: FullValue{Elm: a}} - val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, b) + val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, nil, b) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking MergeAccumulators")) } @@ -213,7 +213,7 @@ func (n *Combine) Down(ctx context.Context) error { } n.status = Down - if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil { n.err.TrySetError(err) } return n.err.Error() @@ -230,7 +230,7 @@ func (n *Combine) newAccum(ctx context.Context, key interface{}) (interface{}, e opt = &MainInput{Key: FullValue{Elm: key}} } - val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt) + val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt, nil) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking CreateAccumulator")) } @@ -273,7 +273,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key, value interface{}, t } v := n.aiValConvert(value) - val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, v) + val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, nil, v) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking AddInput")) } @@ -287,7 +287,7 @@ func (n *Combine) extract(ctx context.Context, accum interface{}) (interface{}, return accum, nil } - val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, accum) + val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, nil, accum) if err != nil { return nil, n.fail(errors.WithContext(err, "invoking ExtractOutput")) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index 456edeba48387..5f90bdf1d7745 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "reflect" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" @@ -39,23 +40,47 @@ type MainInput struct { RTracker sdf.RTracker } +type bundleFinalizationCallback struct { + callback func() error + validUntil time.Time +} + +// bundleFinalizer holds all the user defined callbacks to be run on bundle finalization. +// Implements typex.BundleFinalization +type bundleFinalizer struct { + callbacks []bundleFinalizationCallback + lastValidCallback time.Time // Used to track when we can safely gc the bundleFinalizer +} + +// RegisterCallback is used to register callbacks during DoFn execution. +func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) { + callback := bundleFinalizationCallback{ + callback: cb, + validUntil: time.Now().Add(t), + } + bf.callbacks = append(bf.callbacks, callback) + if bf.lastValidCallback.Before(callback.validUntil) { + bf.lastValidCallback = callback.validUntil + } +} + // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) { +func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke } inv := newInvoker(fn) - return inv.Invoke(ctx, pn, ws, ts, opt, extra...) + return inv.Invoke(ctx, pn, ws, ts, opt, bf, extra...) } // InvokeWithoutEventTime runs the given function at time 0 in the global window. -func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) { +func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { if fn == nil { return nil, nil // ok: nothing to Invoke } inv := newInvoker(fn) - return inv.InvokeWithoutEventTime(ctx, opt, extra...) + return inv.InvokeWithoutEventTime(ctx, opt, bf, extra...) } // invoker is a container struct for hot path invocations of DoFns, to avoid @@ -64,9 +89,9 @@ type invoker struct { fn *funcx.Fn args []interface{} // TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement. - ctxIdx, pnIdx, wndIdx, etIdx int // specialized input indexes - outEtIdx, outErrIdx int // specialized output indexes - in, out []int // general indexes + ctxIdx, pnIdx, wndIdx, etIdx, bfIdx int // specialized input indexes + outEtIdx, outErrIdx int // specialized output indexes + in, out []int // general indexes ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue. elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types. @@ -99,6 +124,11 @@ func newInvoker(fn *funcx.Fn) *invoker { if n.outErrIdx, ok = fn.Error(); !ok { n.outErrIdx = -1 } + // TODO(BEAM-10976) - add this back in once BundleFinalization is implemented + // if n.bfIdx, ok = fn.BundleFinalization(); !ok { + // n.bfIdx = -1 + // } + n.bfIdx = -1 n.initCall() @@ -115,13 +145,13 @@ func (n *invoker) Reset() { } // InvokeWithoutEventTime runs the function at time 0 in the global window. -func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, extra ...interface{}) (*FullValue, error) { - return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, extra...) +func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { + return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, opt, bf, extra...) } // Invoke invokes the fn with the given values. The extra values must match the non-main // side input and emitters. It returns the direct output, if any. -func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) { +func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, opt *MainInput, bf *bundleFinalizer, extra ...interface{}) (*FullValue, error) { // (1) Populate contexts // extract these to make things easier to read. args := n.args @@ -143,6 +173,9 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind if n.etIdx >= 0 { args[n.etIdx] = ts } + if n.bfIdx >= 0 { + args[n.bfIdx] = bf + } // (2) Main input from value, if any. i := 0 diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go index b7ab50108296c..b4db872395c6e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go @@ -17,6 +17,7 @@ package exec import ( "context" + "errors" "fmt" "reflect" "testing" @@ -178,7 +179,7 @@ func TestInvoke(t *testing.T) { test.ExpectedTime = ts } - val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...) + val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...) if err != nil { t.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err) } @@ -195,6 +196,53 @@ func TestInvoke(t *testing.T) { } } +func TestRegisterCallback(t *testing.T) { + bf := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } + testVar := 0 + bf.RegisterCallback(500*time.Minute, func() error { + testVar += 5 + return nil + }) + bf.RegisterCallback(2*time.Minute, func() error { + testVar = 25 + return nil + }) + callbackErr := errors.New("Callback error") + bf.RegisterCallback(2*time.Minute, func() error { + return callbackErr + }) + + // We can't do exact equality since this relies on real time, we'll give it a broad range + if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) || bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) { + t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback) + } + if got, want := len(bf.callbacks), 3; got != want { + t.Fatalf("Callbacks in bundleFinalizer does not match number of calls to RegisterCallback(), got %v callbacks, want %v", got, want) + } + + callbackIdx := 0 + if err := bf.callbacks[callbackIdx].callback(); err != nil { + t.Errorf("RegisterCallback() callback at index %v returned unexpected error: %v", callbackIdx, err) + } + if got, want := testVar, 5; got != want { + t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want) + } + callbackIdx = 1 + if err := bf.callbacks[callbackIdx].callback(); err != nil { + t.Errorf("RegisterCallback() callback at index %v returned error %v, want nil", callbackIdx, err) + } + if got, want := testVar, 25; got != want { + t.Errorf("RegisterCallback() callback at index %v set testvar to %v, want %v", callbackIdx, got, want) + } + callbackIdx = 2 + if err := bf.callbacks[2].callback(); err != callbackErr { + t.Errorf("RegisterCallback() callback at index %v returned error %v, want %v", callbackIdx, err, callbackErr) + } +} + // Benchmarks // Invoke is implemented as a single use of a cached invoker, so a measure of @@ -314,7 +362,7 @@ func BenchmarkInvoke(b *testing.B) { ts := mtime.ZeroTimestamp.Add(2 * time.Millisecond) b.Run(fmt.Sprintf("SingleInvoker_%s", test.Name), func(b *testing.B) { for i := 0; i < b.N; i++ { - _, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...) + _, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...) if err != nil { b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err) } @@ -323,7 +371,7 @@ func BenchmarkInvoke(b *testing.B) { b.Run(fmt.Sprintf("CachedInvoker_%s", test.Name), func(b *testing.B) { inv := newInvoker(fn) for i := 0; i < b.N; i++ { - _, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, test.Args...) + _, err := inv.Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, nil, test.Args...) if err != nil { b.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err) } @@ -416,7 +464,7 @@ func BenchmarkInvokeCall(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}) + ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil) n = ret.Elm.(int) } b.Log(n) @@ -427,7 +475,7 @@ func BenchmarkInvokeCallExtra(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n) + ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n) n = ret.Elm.(int) } b.Log(n) @@ -453,7 +501,7 @@ func BenchmarkInvokeFnCall(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}) + ret, _ := InvokeWithoutEventTime(ctx, fn, &MainInput{Key: FullValue{Elm: n}}, nil) n = ret.Elm.(int) } b.Log(n) @@ -464,7 +512,7 @@ func BenchmarkInvokeFnCallExtra(b *testing.B) { ctx := context.Background() n := 0 for i := 0; i < b.N; i++ { - ret, _ := InvokeWithoutEventTime(ctx, fn, nil, n) + ret, _ := InvokeWithoutEventTime(ctx, fn, nil, nil, n) n = ret.Elm.(int) } b.Log(n) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index acd086745ce7a..aefcd57e0ed34 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -42,6 +42,7 @@ type ParDo struct { emitters []ReusableEmitter ctx context.Context inv *invoker + bf *bundleFinalizer reader StateReader cache *cacheElm @@ -83,7 +84,7 @@ func (n *ParDo) Up(ctx context.Context) error { // Subsequent bundles might run this same node, and the context here would be // incorrectly refering to the older bundleId. setupCtx := metrics.SetPTransformID(ctx, n.PID) - if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil, nil); err != nil { return n.fail(err) } @@ -229,7 +230,7 @@ func (n *ParDo) Down(ctx context.Context) error { n.reader = nil n.cache = nil - if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err != nil { + if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil, nil); err != nil { n.err.TrySetError(err) } return n.err.Error() @@ -295,7 +296,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex. if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.cache.extra...) + val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.bf, n.cache.extra...) if err != nil { return nil, err } @@ -313,7 +314,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.cache.extra...) + val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.bf, n.cache.extra...) if err != nil { return nil, err } diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 7f89ce37322c3..abda426f639c6 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) @@ -33,6 +34,7 @@ type Plan struct { roots []Root units []Unit pcols []*PCollection + bf *bundleFinalizer status Status @@ -45,6 +47,10 @@ func NewPlan(id string, units []Unit) (*Plan, error) { var roots []Root var pcols []*PCollection var source *DataSource + bf := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } for _, u := range units { if u == nil { @@ -59,6 +65,9 @@ func NewPlan(id string, units []Unit) (*Plan, error) { if p, ok := u.(*PCollection); ok { pcols = append(pcols, p) } + if p, ok := u.(*ParDo); ok { + p.bf = &bf + } } if len(roots) == 0 { return nil, errors.Errorf("no root units") @@ -70,6 +79,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) { roots: roots, units: units, pcols: pcols, + bf: &bf, source: source, }, nil } @@ -131,6 +141,46 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro return nil } +// Finalize runs any callbacks registered by the bundleFinalizer. Should be run on bundle finalization. +func (p *Plan) Finalize() error { + if p.status != Up { + return errors.Errorf("invalid status for plan %v: %v", p.id, p.status) + } + failedIndices := []int{} + for idx, bfc := range p.bf.callbacks { + if time.Now().Before(bfc.validUntil) { + if err := bfc.callback(); err != nil { + failedIndices = append(failedIndices, idx) + } + } + } + + newFinalizer := bundleFinalizer{ + callbacks: []bundleFinalizationCallback{}, + lastValidCallback: time.Now(), + } + + for _, idx := range failedIndices { + newFinalizer.callbacks = append(newFinalizer.callbacks, p.bf.callbacks[idx]) + if newFinalizer.lastValidCallback.Before(p.bf.callbacks[idx].validUntil) { + newFinalizer.lastValidCallback = p.bf.callbacks[idx].validUntil + } + } + + p.bf = &newFinalizer + + if len(failedIndices) > 0 { + return errors.Errorf("Plan %v failed %v callbacks", p.ID(), len(failedIndices)) + } + return nil +} + +// GetExpirationTime returns the last expiration time of any of the callbacks registered by the bundleFinalizer. +// Once we have passed this time, it is safe to move this plan to inactive without missing any valid callbacks. +func (p *Plan) GetExpirationTime() time.Time { + return p.bf.lastValidCallback +} + // Down takes the plan and associated units down. Does not panic. func (p *Plan) Down(ctx context.Context) error { if p.status == Down { diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go b/sdks/go/pkg/beam/core/runtime/exec/util.go index 2996b6dd159b9..019b190705706 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/util.go +++ b/sdks/go/pkg/beam/core/runtime/exec/util.go @@ -72,7 +72,7 @@ func MultiStartBundle(ctx context.Context, id string, data DataContext, list ... return nil } -// MultiFinishBundle calls StartBundle on multiple nodes. Convenience function. +// MultiFinishBundle calls FinishBundle on multiple nodes. Convenience function. func MultiFinishBundle(ctx context.Context, list ...Node) error { for _, n := range list { if err := n.FinishBundle(ctx); err != nil { diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 5b0e77883f432..5b861f15188a4 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -102,16 +102,17 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { sideCache.Init(cacheSize) ctrl := &control{ - lookupDesc: lookupDesc, - descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor), - plans: make(map[bundleDescriptorID][]*exec.Plan), - active: make(map[instructionID]*exec.Plan), - inactive: newCircleBuffer(), - metStore: make(map[instructionID]*metrics.Store), - failed: make(map[instructionID]error), - data: &DataChannelManager{}, - state: &StateChannelManager{}, - cache: &sideCache, + lookupDesc: lookupDesc, + descriptors: make(map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor), + plans: make(map[bundleDescriptorID][]*exec.Plan), + active: make(map[instructionID]*exec.Plan), + awaitingFinalization: make(map[instructionID]awaitingFinalization), + inactive: newCircleBuffer(), + metStore: make(map[instructionID]*metrics.Store), + failed: make(map[instructionID]error), + data: &DataChannelManager{}, + state: &StateChannelManager{}, + cache: &sideCache, } // gRPC requires all readers of a stream be the same goroutine, so this goroutine @@ -222,11 +223,19 @@ func (c *circleBuffer) Contains(instID instructionID) bool { return ok } +type awaitingFinalization struct { + expiration time.Time + plan *exec.Plan + bdID bundleDescriptorID +} + type control struct { lookupDesc func(bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) descriptors map[bundleDescriptorID]*fnpb.ProcessBundleDescriptor // protected by mu // plans that are candidates for execution. plans map[bundleDescriptorID][]*exec.Plan // protected by mu + // plans that are awaiting bundle finalization. + awaitingFinalization map[instructionID]awaitingFinalization //protected by mu // plans that are actively being executed. // a plan can only be in one of these maps at any time. active map[instructionID]*exec.Plan // protected by mu @@ -338,14 +347,36 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe c.cache.CompleteBundle(tokens...) mons, pylds := monitoring(plan, store) + requiresFinalization := false // Move the plan back to the candidate state c.mu.Lock() // Mark the instruction as failed. if err != nil { c.failed[instID] = err } else { - // Non failure plans can be re-used. - c.plans[bdID] = append(c.plans[bdID], plan) + // Non failure plans should either be moved to the finalized state + // or to plans so they can be re-used. + expiration := plan.GetExpirationTime() + if time.Now().Before(expiration) { + // TODO(BEAM-10976) - we can be a little smarter about data structures here by + // by storing plans awaiting finalization in a heap. That way when we expire plans + // here its O(1) instead of O(n) (though adding/finalizing will still be O(logn)) + requiresFinalization = true + c.awaitingFinalization[instID] = awaitingFinalization{ + expiration: expiration, + plan: plan, + bdID: bdID, + } + // Move any plans that have exceeded their expiration back into the re-use pool + for id, af := range c.awaitingFinalization { + if time.Now().After(af.expiration) { + c.plans[af.bdID] = append(c.plans[af.bdID], af.plan) + delete(c.awaitingFinalization, id) + } + } + } else { + c.plans[bdID] = append(c.plans[bdID], plan) + } } delete(c.active, instID) if removed, ok := c.inactive.Insert(instID); ok { @@ -362,12 +393,38 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe InstructionId: string(instID), Response: &fnpb.InstructionResponse_ProcessBundle{ ProcessBundle: &fnpb.ProcessBundleResponse{ - MonitoringData: pylds, - MonitoringInfos: mons, + MonitoringData: pylds, + MonitoringInfos: mons, + RequiresFinalization: requiresFinalization, }, }, } + case req.GetFinalizeBundle() != nil: + msg := req.GetFinalizeBundle() + + ref := instructionID(msg.GetInstructionId()) + + af, ok := c.awaitingFinalization[ref] + if !ok { + return fail(ctx, instID, "finalize bundle failed for instruction %v: couldn't find plan in finalizing map", ref) + } + + if time.Now().Before(af.expiration) { + if err := af.plan.Finalize(); err != nil { + return fail(ctx, instID, "finalize bundle failed for instruction %v using plan %v : %v", ref, af.bdID, err) + } + } + c.plans[af.bdID] = append(c.plans[af.bdID], af.plan) + delete(c.awaitingFinalization, ref) + + return &fnpb.InstructionResponse{ + InstructionId: string(instID), + Response: &fnpb.InstructionResponse_FinalizeBundle{ + FinalizeBundle: &fnpb.FinalizeBundleResponse{}, + }, + } + case req.GetProcessBundleProgress() != nil: msg := req.GetProcessBundleProgress() @@ -506,6 +563,13 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe func (c *control) getPlanOrResponse(ctx context.Context, kind string, instID, ref instructionID) (*exec.Plan, *metrics.Store, *fnpb.InstructionResponse) { c.mu.Lock() plan, ok := c.active[ref] + if !ok { + var af awaitingFinalization + af, ok = c.awaitingFinalization[ref] + if ok { + plan = af.plan + } + } err := c.failed[ref] store := c.metStore[ref] defer c.mu.Unlock() diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go b/sdks/go/pkg/beam/runners/direct/buffer.go index 901cfe929af78..8e92ff72d0b52 100644 --- a/sdks/go/pkg/beam/runners/direct/buffer.go +++ b/sdks/go/pkg/beam/runners/direct/buffer.go @@ -163,7 +163,6 @@ func (w *wait) FinishBundle(ctx context.Context) error { } w.done = true return w.next.FinishBundle(ctx) - } func (w *wait) Down(ctx context.Context) error {