Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24789] Remainder of changes from #27550. #27822

Merged
merged 21 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -128,7 +130,12 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
return nil, err
}
ch.forceRecreate = func(id string, err error) {
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
switch status.Code(err) {
case codes.Canceled:
// Don't log on context canceled path.
default:
log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
Expand Down Expand Up @@ -371,7 +378,8 @@ func (c *DataChannel) read(ctx context.Context) {
c.terminateStreamOnError(err)
c.mu.Unlock()

if err == io.EOF {
st := status.Code(err)
if st == codes.Canceled || err == io.EOF {
return
}
log.Errorf(ctx, "DataChannel.read %v bad: %v", c.id, err)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,6 @@ func fail(ctx context.Context, id instructionID, format string, args ...any) *fn
// dial to the specified endpoint. if timeout <=0, call blocks until
// grpc.Dial succeeds.
func dial(ctx context.Context, endpoint, purpose string, timeout time.Duration) (*grpc.ClientConn, error) {
log.Infof(ctx, "Connecting via grpc @ %s for %s ...", endpoint, purpose)
log.Output(ctx, log.SevDebug, 1, fmt.Sprintf("Connecting via grpc @ %s for %s ...", endpoint, purpose))
return grpcx.Dial(ctx, endpoint, timeout)
}
9 changes: 8 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/statemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type writeTypeEnum int32
Expand Down Expand Up @@ -525,7 +527,12 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC
return nil, err
}
ch.forceRecreate = func(id string, err error) {
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
switch status.Code(err) {
case codes.Canceled:
// Don't log on context canceled path.
default:
log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err)
}
m.mu.Lock()
delete(m.ports, port.URL)
m.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (any, error) {
type failResolver bool

func (p failResolver) Sym2Addr(name string) (uintptr, error) {
return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
return 0, errors.Errorf("%v not found. Register DoFns and functions with the the beam/register package.", name)
}
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func createList(s Scope, values []any, t reflect.Type) (PCollection, error) {

// TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421.

func init() {
register.DoFn2x1[[]byte, func(T), error]((*createFn)(nil))
register.Emitter1[T]()
}

type createFn struct {
Values [][]byte `json:"values"`
Type EncodedType `json:"type"`
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestCreateList(t *testing.T) {
{[]float64{float64(0.1), float64(0.2), float64(0.3)}},
{[]uint{uint(1), uint(2), uint(3)}},
{[]bool{false, true, true, false, true}},
{[]wc{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}},
{[]*testProto{&testProto{}, &testProto{stringValue("test")}}}, // Test for BEAM-4401
{[]wc{{"a", 23}, {"b", 42}, {"c", 5}}},
{[]*testProto{{}, {stringValue("test")}}}, // Test for BEAM-4401
}

for _, test := range tests {
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/io/databaseio/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
_ "github.com/proullon/ramsql/driver"
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/io/datastoreio/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Foo struct {
type Bar struct {
}

func init() {
beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem())
}

func Test_query(t *testing.T) {
testCases := []struct {
v any
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/pardo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func testFunction() int64 {

func TestFormatParDoError(t *testing.T) {
got := formatParDoError(testFunction, 2, 1)
want := "beam.testFunction has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead."
want := "has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead."
if !strings.Contains(got, want) {
t.Errorf("formatParDoError(testFunction,2,1) = %v, want = %v", got, want)
t.Errorf("formatParDoError(testFunction,2,1) = \n%q want =\n%q", got, want)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,6 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) {
ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side]
// These panics indicate pre-process/stage construction problems.
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side))
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
type RunnerCharacteristic struct {
SDKFlatten bool // Sets whether we should force an SDK side flatten.
SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
SDKReshuffle bool
SDKReshuffle bool // Sets whether we should use the SDK backup implementation to handle a Reshuffle.
}

func Runner(config any) *runner {
Expand Down
8 changes: 6 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type link struct {
// should in principle be able to connect two SDK environments directly
// instead of going through the runner at all, which would be a small
// efficiency gain, in runner memory use.
//
// That would also warrant an execution mode where fusion is taken into
// account, but all serialization boundaries remain since the pcollections
// would continue to get serialized.
type stage struct {
ID string
transforms []string
Expand Down Expand Up @@ -145,11 +149,11 @@ progress:
if previousIndex == index && !splitsDone {
sr, err := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
if err != nil {
slog.Debug("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
break progress
}
if sr.GetChannelSplits() == nil {
slog.Warn("split failed", "bundle", rb)
slog.Debug("SDK returned no splits", "bundle", rb)
splitsDone = true
continue progress
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func TestUnimplemented(t *testing.T) {
tests := []struct {
pipeline func(s beam.Scope)
}{
// These tests don't terminate, so can't be run.
// {pipeline: primitives.Drain}, // Can't test drain automatically yet.

{pipeline: primitives.TestStreamBoolSequence},
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}

// Progress sends a progress request for the given bundle to the passed in worker, blocking on the response.
func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleProgress{
Expand All @@ -159,6 +160,7 @@ func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
return resp.GetProcessBundleProgress(), nil
}

// Split sends a split request for the given bundle to the passed in worker, blocking on the response.
func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleSplit{
Expand Down
7 changes: 4 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
// TODO: Do more than assume these are ProcessBundleResponses.
wk.mu.Lock()
if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok {
// Error is handled in the resonse handler.
b.Respond(resp)
} else {
slog.Debug("ctrl.Recv: %v", resp)
Expand All @@ -268,7 +267,10 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
for {
select {
case req := <-wk.InstReqs:
ctrl.Send(req)
err := ctrl.Send(req)
if err != nil {
return err
}
case <-ctrl.Context().Done():
slog.Debug("Control context canceled")
return ctrl.Context().Err()
Expand Down Expand Up @@ -322,7 +324,6 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
wk.mu.Unlock()
}
}()

for {
select {
case req, ok := <-wk.DataReqs:
Expand Down
16 changes: 8 additions & 8 deletions sdks/go/pkg/beam/runners/universal/runnerlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
return errors.Wrap(err, "failed to get job stream")
}

mostRecentError := errors.New("<no error received, see runner logs>")
mostRecentError := "<no error received>"
var errReceived, jobFailed bool

for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
if jobFailed {
// Connection finished with a failed status, so produce what we have.
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
// Connection finished, so time to exit, produce what we have.
return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError)
}
return nil
}
Expand All @@ -123,17 +123,17 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID
case msg.GetStateResponse() != nil:
resp := msg.GetStateResponse()

log.Infof(ctx, "Job state: %v", resp.GetState().String())
log.Infof(ctx, "Job[%v] state: %v", jobID, resp.GetState().String())

switch resp.State {
case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
return nil
case jobpb.JobState_FAILED:
jobFailed = true
if errReceived {
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError)
}
// Otherwise, wait for at least one error log from the runner, or the connection to close.
// Otherwise we should wait for at least one error log from the runner.
}

case msg.GetMessageResponse() != nil:
Expand All @@ -144,10 +144,10 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID

if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR {
errReceived = true
mostRecentError = errors.New(resp.GetMessageText())
mostRecentError = resp.GetMessageText()

if jobFailed {
return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError)
return errors.Errorf("job %v failed:\n%w", jobID, errors.New(mostRecentError))
}
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/vet/vet.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func init() {
type disabledResolver bool

func (p disabledResolver) Sym2Addr(name string) (uintptr, error) {
return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
return 0, errors.Errorf("%v not found. Register DoFns and functions with the beam/register package.", name)
}

// Execute evaluates the pipeline on whether it can run without reflection.
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ var portableFilters = []string{
"TestSetStateClear",
}

// TODO(lostluck): set up a specific run for these.
var prismFilters = []string{
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
Expand All @@ -149,7 +148,7 @@ var prismFilters = []string{
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
// The prsim runner does not support pipeline drain for SDF.
// The prism runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,9 @@ words = ...
{{< /highlight >}}

{{< highlight go >}}

The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner.

// words is the input PCollection of strings
var words beam.PCollection = ...

Expand Down Expand Up @@ -1170,8 +1173,8 @@ words = ...
{{< /highlight >}}

{{< highlight go >}}
// words is the input PCollection of strings
var words beam.PCollection = ...

The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner.

{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_apply_anon >}}
{{< /highlight >}}
Expand All @@ -1191,7 +1194,7 @@ words = ...

<span class="language-go">

> **Note:** Anonymous function DoFns may not work on distributed runners.
> **Note:** Anonymous function DoFns do not work on distributed runners.
> It's recommended to use named functions and register them with `register.FunctionXxY` in
> an `init()` block.
Expand Down