Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Windowing Causes Panics In Timer and State #32559

Closed
1 of 17 tasks
SamStentz opened this issue Sep 25, 2024 · 7 comments
Closed
1 of 17 tasks

[Bug]: Windowing Causes Panics In Timer and State #32559

SamStentz opened this issue Sep 25, 2024 · 7 comments

Comments

@SamStentz
Copy link

What happened?

I've been trying to use the new beam state and timers to implement a retry policy. This retry policy:

  1. uses a processing time timer to defer execution
  2. places event time and values in state to use in OnTimer
  3. sets timer watermarks to incoming event's watermark

The input is a keyed pcollection with a single entry. It seems like windowing causes my pardo to panic, which is surprising to me from having read stateful and timely processing and the tour of beam (particularly this example).

Using a debugger to move through my code, the panic occurs after ProcessElement and before OnTimer is called.

2024/09/25 16:40:30 INFO ProcessElement <%!s(int=0), foo> source=/.../poc/donothing.go:45 time=2024-09-25T16:40:30.856Z worker.ID=job-001[go-job-1-1727282430841761429]_go worker.endpoint=localhost:46073
panic: unable to decode bool; expected {0, 1} got 127

using this package in go.mod

github.com/apache/beam/sdks/v2 v2.58.0

donothing.go

package poc

import (
	"context"
	"reflect"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

func init() {
	runtime.RegisterFunction(NewDoNothingTransform)
	runtime.RegisterFunction(DoNothingTransform)
	runtime.RegisterType(reflect.TypeOf((*doNothingTransformFn)(nil)).Elem())
	runtime.RegisterType(reflect.TypeOf((*context.Context)(nil)).Elem())
}

type doNothingTransformFn struct {
	TimerCount      state.Value[int64]
	TimerTimerstamp state.Value[int64]
	Value           state.Value[string]
	OutputState     timers.ProcessingTime
}

func NewDoNothingTransform() *doNothingTransformFn {
	return &doNothingTransformFn{
		TimerCount:      state.MakeValueState[int64]("timerCount"),
		TimerTimerstamp: state.MakeValueState[int64]("timerTimerstamp"),
		Value:           state.MakeValueState[string]("value"),
		OutputState:     timers.InProcessingTime("processingTime"),
	}
}

func (fn *doNothingTransformFn) ProcessElement(ctx context.Context,
	et beam.EventTime,
	sp state.Provider, tp timers.Provider,
	k beam.X,
	v string,
	_ func(string)) {

	log.Infof(ctx, "ProcessElement <%s, %s>", k, v)

	err := fn.TimerCount.Write(sp, 0)
	if err != nil {
		panic("couldn't set TimerCount state")
	}
	err = fn.TimerTimerstamp.Write(sp, et.ToTime().UnixMilli())
	if err != nil {
		panic("couldn't set TimerTimerstamp state")
	}
	err = fn.Value.Write(sp, v)
	if err != nil {
		panic("couldn't set Value state")
	}
	// Set timer for 1 second processing time.
	fn.OutputState.Set(tp, time.Now().Add(time.Second), timers.WithOutputTimestamp(et.ToTime()))
}

func (fn *doNothingTransformFn) OnTimer(ctx context.Context,
	sp state.Provider, tp timers.Provider,
	k beam.X, timer timers.Context,
	emit func(string)) {
	// Read state.
	tc, tcp, err := fn.TimerCount.Read(sp)
	if err != nil || !tcp {
		panic("couldn't read TimerCount state")
	}
	timerTimerstamp, tsp, err := fn.TimerTimerstamp.Read(sp)
	if err != nil || !tsp {
		panic("couldn't read TimerTimerstamp state")
	}
	ts := time.UnixMilli(timerTimerstamp)
	v, _, err := fn.Value.Read(sp)
	if err != nil {
		panic("couldn't read Value state")
	}
	log.Infof(ctx, "OnTimer <%s, %s>: count %d", k, v, tc)
	// terminating condition of 100 timer calls.
	if tc > 100 {
		emit(v)
		return
	}
	// Set timer for 1 second processing time.
	fn.OutputState.Set(tp, time.Now().Add(time.Second), timers.WithOutputTimestamp(ts))
	err = fn.TimerCount.Write(sp, tc+1)
	if err != nil {
		log.Errorf(ctx, "error writing timer time: %v", err)
	}
}

func DoNothingTransform(scope beam.Scope, in beam.PCollection) beam.PCollection {
	return beam.ParDo(scope.Scope("DoNothing"), NewDoNothingTransform(), in)
}

donothing_test.go

package poc

import (
	"testing"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestDoNothingTransform(t *testing.T) {
	beam.Init()
	pipeline, scope, col := ptest.CreateList([]string{"foo"})
	col = beam.AddFixedKey(scope, col)

	// If I add this window we hit panics.
	col = beam.WindowInto(scope, window.NewFixedWindows(time.Second), col)

	DoNothingTransform(scope, col)

	ptest.RunAndValidate(t, pipeline)
}

func TestMain(m *testing.M) {
	ptest.MainWithDefault(m, "prism")
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@SamStentz SamStentz changed the title [Bug]: Windowing Causes Panics [Bug]: Windowing Causes Panics In Timer and State Sep 25, 2024
@SamStentz
Copy link
Author

SamStentz commented Sep 25, 2024

Im also hitting slightly different behavior when I deploy a pipeline to the GCP dataflow runner.

There, I see after an indeterminate count of retries, reading of state in OnTimer shows the state has been wiped (the below snippet returns tcp = false).

	tc, tcp, err := fn.TimerCount.Read(sp)
	if err != nil || !tcp {
		panic("couldn't read TimerCount state")
	}

Am I incorrect in assuming that if my ProcessElement function always sets state values, then OnTimer will always have that value populated? I was using

timers.WithOutputTimestamp(ts)

To ensure the watermark doesn't ever advance and thus the processbundle doesn't complete.

For context, I am trying to add this timer per every element in a PCollection, so I am adding a uuid key. Regardless, I wouldn't expect the behavior to result in a wiped State.

I did add a log in FinishBundle for the above run, and can confirm what is occurring is the bundle is being closed before OnTimer is called. In other words behavior is

  1. OnTimer logs X times for successful retries
  2. "Closing bundle!" is logged
  3. OnTimer not present state error occurs (made this just an error log instead of a panic)
  4. "Closing bundle!" is logged again.
func (fn * doNothingTransformFn) FinishBundle(ctx context.Context, _ func(string)) error {
	log.Infof(ctx, "Closing bundle!")
	return nil
}

@lostluck lostluck self-assigned this Sep 25, 2024
@lostluck
Copy link
Contributor

Obligatory question about whether the bug still exists in 2.59.0? Prism is under active development, so it's not impossible it was fixed between versions.

@lostluck
Copy link
Contributor

#32559 (comment) is probably a separate issue.

I would not yet vouch for the behavior of ProcessingTime timers yet without using TestStream, as we have yet to put in the ability for Prism actually execute in "real" time, so it's not going to execute in a comparable fashion to a production runner like Dataflow.

It's important to note that setting the OutputTimestamp only arrests the downstream watermark, not the upstream watermark which determines EventTime firing. Also, outside of literally blocking the execution within a DoFn, Process bundle will eventually complete when all user code has returned.

The nil/empty bytes issue might have been #32245 which either cause corruption or zeroing when built with Go 1.23.0+.

@lostluck
Copy link
Contributor

Finally, please include more of the panic trace.

Based on the provided information, it happens on line 45 of the given file, which is the line log.Infof(ctx, "ProcessElement <%s, %s>", k, v) inside func (fn *doNothingTransformFn) ProcessElement. That does line up with what the description says, but it doesn't have where the panic actually happened, which might be important.

I think I saw errors like this when running the Java Validates Runner tests, but had not chased them down specifically (there were other features to resolve there first). So having a Go repro shows it's not simply the Java SDK doing something prism can't yet tolerate.

@SamStentz
Copy link
Author

question about whether the bug still exists in 2.59.0? Prism is under active development, so it's not impossible it was fixed between versions.

Great news! I upgraded to 2.59.0 and the panics I saw earlier are indeed gone! w.r.t. where the panic was occurring, I can pull that stack trace if you would like, but considering it no longer is occurring maybe thats unneeded?

@lostluck
Copy link
Contributor

question about whether the bug still exists in 2.59.0? Prism is under active development, so it's not impossible it was fixed between versions.

Great news! I upgraded to 2.59.0 and the panics I saw earlier are indeed gone! w.r.t. where the panic was occurring, I can pull that stack trace if you would like, but considering it no longer is occurring maybe thats unneeded?

Glad to hear it! I don't think the stack trace is necessary now, if the panic is gone. I recall that I had to fix timer handling wrt bytes being incorrectly interpreted, to pass additional tests so that's probably what did it.

Closing this issue for now.

@github-actions github-actions bot added this to the 2.60.0 Release milestone Sep 25, 2024
@SamStentz
Copy link
Author

Sounds good, trying to actually get a reproducible example of a dataflow deployed dofn causing the issue in followup before filing an issue. Thanks again for the help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants