From 56e5c2c6ca574c969b53a72be51be1fa04c88786 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Mon, 7 Feb 2022 19:11:21 -0800 Subject: [PATCH] [BEAM-13647] Use role for Go worker binary. (#16729) --- .../src/main/proto/beam_runner_api.proto | 6 + sdks/go/container/boot.go | 16 +- sdks/go/pkg/beam/artifact/materialize.go | 5 +- .../pkg/beam/core/runtime/graphx/translate.go | 48 +++-- .../core/runtime/graphx/translate_test.go | 165 ++++++++++++++++++ .../runners/dataflow/dataflowlib/execute.go | 16 +- .../runners/dataflow/dataflowlib/stage.go | 18 +- .../runners/universal/runnerlib/execute.go | 64 +++++-- .../beam/runners/universal/runnerlib/stage.go | 4 +- 9 files changed, 303 insertions(+), 39 deletions(-) diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 539c9b83c350e..b4598833faa61 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -1414,6 +1414,12 @@ message StandardArtifacts { // A URN for pip-requirements-file role. // payload: None PIP_REQUIREMENTS_FILE = 1 [(beam_urn) = "beam:artifact:role:pip_requirements_file:v1"]; + + // A URN for the Go worker binary role. + // This represents the executable for a Go SDK environment. + // A Go environment may have one such artifact with this role. + // payload: None + GO_WORKER_BINARY = 2 [(beam_urn) = "beam:artifact:role:go_worker_binary:v1"]; } } diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index 1f7ea5f099420..49d9165195ca2 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -99,6 +99,7 @@ func main() { log.Fatalf("Failed to retrieve staged files: %v", err) } + // TODO(BEAM-13647): Remove legacy hack once aged out. const worker = "worker" name := worker @@ -110,12 +111,23 @@ func main() { default: found := false for _, a := range artifacts { - n, _ := artifact.MustExtractFilePayload(a) - if n == worker { + if a.GetRoleUrn() == artifact.URNGoWorkerBinaryRole { + name, _ = artifact.MustExtractFilePayload(a) found = true break } } + // TODO(BEAM-13647): Remove legacy hack once aged out. + if !found { + for _, a := range artifacts { + n, _ := artifact.MustExtractFilePayload(a) + if n == worker { + found = true + log.Printf("Go worker binary found with legacy name '%v' found", worker) + break + } + } + } if !found { log.Fatalf("No artifact named '%v' found", worker) } diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 4759c0c093cb4..3604675a1a1f9 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -45,6 +45,7 @@ import ( const ( URNFileArtifact = "beam:artifact:type:file:v1" URNUrlArtifact = "beam:artifact:type:url:v1" + URNGoWorkerBinaryRole = "beam:artifact:role:go_worker_binary:v1" URNPipRequirementsFile = "beam:artifact:role:pip_requirements_file:v1" URNStagingTo = "beam:artifact:role:staging_to:v1" NoArtifactsStaged = "__no_artifacts_staged__" @@ -172,11 +173,11 @@ func extractStagingToPath(artifact *pipepb.ArtifactInformation) (string, error) func MustExtractFilePayload(artifact *pipepb.ArtifactInformation) (string, string) { if artifact.TypeUrn != URNFileArtifact { - log.Fatalf("Unsupported artifact type #{artifact.TypeUrn}") + log.Fatalf("Unsupported artifact type %v", artifact.TypeUrn) } ty := pipepb.ArtifactFilePayload{} if err := proto.Unmarshal(artifact.TypePayload, &ty); err != nil { - log.Fatalf("failed to parse artifact file payload: #{err}") + log.Fatalf("failed to parse artifact file payload: %v", err) } return ty.Path, ty.Sha256 } diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 596ec63a1008f..a53828e061993 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -67,8 +67,17 @@ const ( URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1" - URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1" - URNArtifactStagingTo = "beam:artifact:role:staging_to:v1" + // Deprecated: Determine worker binary based on GoWorkerBinary Role instead. + URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1" + + URNArtifactFileType = "beam:artifact:type:file:v1" + URNArtifactURLType = "beam:artifact:type:url:v1" + URNArtifactGoWorkerRole = "beam:artifact:role:go_worker_binary:v1" + + // Environment Urns. + URNEnvProcess = "beam:env:process:v1" + URNEnvExternal = "beam:env:external:v1" + URNEnvDocker = "beam:env:docker:v1" ) func goCapabilities() []string { @@ -85,14 +94,14 @@ func goCapabilities() []string { func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error) { var serializedPayload []byte switch urn { - case "beam:env:process:v1": + case URNEnvProcess: // TODO Support process based SDK Harness. return nil, errors.Errorf("unsupported environment %v", urn) - case "beam:env:external:v1": + case URNEnvExternal: config := extractEnvironmentConfig(ctx) payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}} serializedPayload = protox.MustEncode(payload) - case "beam:env:docker:v1": + case URNEnvDocker: fallthrough default: config := extractEnvironmentConfig(ctx) @@ -105,11 +114,9 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig Capabilities: goCapabilities(), Dependencies: []*pipepb.ArtifactInformation{ { - TypeUrn: URNArtifactGoWorker, - RoleUrn: URNArtifactStagingTo, - RolePayload: protox.MustEncode(&pipepb.ArtifactStagingToRolePayload{ - StagedName: "worker", - }), + TypeUrn: URNArtifactFileType, + TypePayload: protox.MustEncode(&pipepb.ArtifactFilePayload{}), + RoleUrn: URNArtifactGoWorkerRole, }, }, }, nil @@ -981,6 +988,7 @@ func boolToBounded(bounded bool) pipepb.IsBounded_Enum { return pipepb.IsBounded_UNBOUNDED } +// defaultEnvId is the environment ID used for Go Pipeline Environments. const defaultEnvId = "go" func (m *marshaller) addDefaultEnv() string { @@ -1259,3 +1267,23 @@ func nodeID(n *graph.Node) string { func scopeID(s *graph.Scope) string { return fmt.Sprintf("s%v", s.ID()) } + +// UpdateDefaultEnvWorkerType is so runners can update the pipeline's default environment +// with the correct artifact type and payload for the Go worker binary. +func UpdateDefaultEnvWorkerType(typeUrn string, pyld []byte, p *pipepb.Pipeline) error { + // Get the Go environment out. + envs := p.GetComponents().GetEnvironments() + env, ok := envs[defaultEnvId] + if !ok { + return errors.Errorf("unable to find default Go environment with ID %q", defaultEnvId) + } + for _, dep := range env.GetDependencies() { + if dep.RoleUrn != URNArtifactGoWorkerRole { + continue + } + dep.TypeUrn = typeUrn + dep.TypePayload = pyld + return nil + } + return errors.Errorf("unable to find dependency with %q role in environment with ID %q,", URNArtifactGoWorkerRole, defaultEnvId) +} diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go index 411e0f22d5514..1b8389edc493e 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go @@ -16,10 +16,13 @@ package graphx_test import ( + "context" "reflect" "testing" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/testing/protocmp" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" @@ -27,6 +30,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/golang/protobuf/proto" @@ -206,3 +210,164 @@ func (fn *splitPickFn) CreateTracker(_ int) *testRT { return &testRT{} func (fn *splitPickFn) ProcessElement(_ *testRT, a int, small, big func(int)) { pickFn(a, small, big) } + +func TestCreateEnvironment(t *testing.T) { + t.Run("process", func(t *testing.T) { + const wantEnv = "process" + urn := graphx.URNEnvProcess + got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return wantEnv }) + if err == nil { + t.Errorf("CreateEnvironment(%v) = %v error, want error since it's unsupported", urn, err) + } + want := (*pipepb.Environment)(nil) + if !proto.Equal(got, want) { + t.Errorf("CreateEnvironment(%v) = %v, want %v since it's unsupported", urn, got, want) + } + }) + tests := []struct { + name string + urn string + payload func(name string) []byte + }{ + { + name: "external", + urn: graphx.URNEnvExternal, + payload: func(name string) []byte { + return protox.MustEncode(&pipepb.ExternalPayload{ + Endpoint: &pipepb.ApiServiceDescriptor{ + Url: name, + }, + }) + }, + }, { + name: "docker", + urn: graphx.URNEnvDocker, + payload: func(name string) []byte { + return protox.MustEncode(&pipepb.DockerPayload{ + ContainerImage: name, + }) + }, + }, + } + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return test.name }) + if err != nil { + t.Errorf("CreateEnvironment(%v) = %v error, want nil", test.urn, err) + } + want := &pipepb.Environment{ + Urn: test.urn, + Payload: test.payload(test.name), + Dependencies: []*pipepb.ArtifactInformation{ + { + TypeUrn: graphx.URNArtifactFileType, + TypePayload: protox.MustEncode(&pipepb.ArtifactFilePayload{}), + RoleUrn: graphx.URNArtifactGoWorkerRole, + }, + }, + } + opts := []cmp.Option{ + protocmp.Transform(), + // Ignore the capabilities field, since we can't access that method here. + protocmp.IgnoreFields(&pipepb.Environment{}, protoreflect.Name("capabilities")), + } + if d := cmp.Diff(want, got, opts...); d != "" { + t.Errorf("CreateEnvironment(%v) diff (-want, +got):\n%v", test.urn, d) + } + }) + } +} + +func TestUpdateDefaultEnvWorkerType(t *testing.T) { + t.Run("noEnvs", func(t *testing.T) { + if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{ + Components: &pipepb.Components{}, + }); err == nil { + t.Error("UpdateDefaultEnvWorkerType() no error, want err") + } + }) + t.Run("noGoEnvs", func(t *testing.T) { + if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{ + Components: &pipepb.Components{ + Environments: map[string]*pipepb.Environment{ + "java": {Urn: "java"}, + "python": {Urn: "python"}, + "typescript": {Urn: "typescript"}, + }, + }, + }); err == nil { + t.Error("UpdateDefaultEnvWorkerType() no error, want err") + } + }) + t.Run("badGoEnv", func(t *testing.T) { + if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{ + Components: &pipepb.Components{ + Environments: map[string]*pipepb.Environment{ + "java": {Urn: "java"}, + "python": {Urn: "python"}, + "typescript": {Urn: "typescript"}, + "go": { + Urn: "test", + Payload: []byte("test"), + Dependencies: []*pipepb.ArtifactInformation{ + { + RoleUrn: "unset", + }, + }, + }, + }, + }, + }); err == nil { + t.Error("UpdateDefaultEnvWorkerType() no error, want err") + } + }) + t.Run("goEnv", func(t *testing.T) { + wantUrn := graphx.URNArtifactFileType + wantPyld := protox.MustEncode(&pipepb.ArtifactFilePayload{ + Path: "good", + }) + p := &pipepb.Pipeline{ + Components: &pipepb.Components{ + Environments: map[string]*pipepb.Environment{ + "java": {Urn: "java"}, + "python": {Urn: "python"}, + "typescript": {Urn: "typescript"}, + "go": { + Urn: "test", + Payload: []byte("test"), + Dependencies: []*pipepb.ArtifactInformation{ + { + TypeUrn: "to be removed", + TypePayload: nil, + RoleUrn: graphx.URNArtifactGoWorkerRole, + }, + }, + }, + }, + }, + } + if err := graphx.UpdateDefaultEnvWorkerType(wantUrn, wantPyld, p); err != nil { + t.Errorf("UpdateDefaultEnvWorkerType() = %v, want nil", err) + } + got := p.GetComponents().GetEnvironments()["go"] + want := &pipepb.Environment{ + Urn: "test", + Payload: []byte("test"), + Dependencies: []*pipepb.ArtifactInformation{ + { + TypeUrn: wantUrn, + TypePayload: wantPyld, + RoleUrn: graphx.URNArtifactGoWorkerRole, + }, + }, + } + opts := []cmp.Option{ + protocmp.Transform(), + } + if d := cmp.Diff(want, got, opts...); d != "" { + t.Errorf("UpdateDefaultEnvWorkerType() diff (-want, +got):\n%v", d) + } + }) + +} diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go index 9727a285900f3..abc4db75145d9 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go @@ -23,6 +23,7 @@ import ( "os" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" @@ -59,16 +60,25 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker } log.Infof(ctx, "Staging worker binary: %v", bin) - - if err := StageFile(ctx, opts.Project, workerURL, bin); err != nil { + hash, err := stageFile(ctx, opts.Project, workerURL, bin) + if err != nil { return presult, err } log.Infof(ctx, "Staged worker binary: %v", workerURL) + if err := graphx.UpdateDefaultEnvWorkerType( + graphx.URNArtifactURLType, + protox.MustEncode(&pipepb.ArtifactUrlPayload{ + Url: workerURL, + Sha256: hash, + }), raw); err != nil { + return presult, err + } + if opts.WorkerJar != "" { log.Infof(ctx, "Staging Dataflow worker jar: %v", opts.WorkerJar) - if err := StageFile(ctx, opts.Project, jarURL, opts.WorkerJar); err != nil { + if _, err := stageFile(ctx, opts.Project, jarURL, opts.WorkerJar); err != nil { return presult, err } log.Infof(ctx, "Staged worker jar: %v", jarURL) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go index a6e019ef36438..6bfe418c95f34 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go @@ -18,6 +18,8 @@ package dataflowlib import ( "bytes" "context" + "crypto/sha256" + "encoding/hex" "io" "os" @@ -33,15 +35,21 @@ func StageModel(ctx context.Context, project, modelURL string, model []byte) err return upload(ctx, project, modelURL, bytes.NewReader(model)) } -// StageFile uploads a file to GCS. -func StageFile(ctx context.Context, project, url, filename string) error { +// stageFile uploads a file to GCS, and returns the sha256 hash. +func stageFile(ctx context.Context, project, url, filename string) (string, error) { fd, err := os.Open(filename) if err != nil { - return errors.Wrapf(err, "failed to open file %s", filename) + return "", errors.Wrapf(err, "failed to open file %s", filename) } defer fd.Close() - return upload(ctx, project, url, fd) + sha256W := sha256.New() + tee := io.TeeReader(fd, sha256W) + if err := upload(ctx, project, url, tee); err != nil { + return "", err + } + hash := hex.EncodeToString(sha256W.Sum(nil)) + return hash, nil } func upload(ctx context.Context, project, object string, r io.Reader) error { @@ -74,7 +82,7 @@ func ResolveXLangArtifacts(ctx context.Context, edges []*graph.MultiEdge, projec } var urls []string for local, remote := range paths { - err := StageFile(ctx, project, remote, local) + _, err := stageFile(ctx, project, remote, local) if err != nil { return nil, errors.WithContextf(err, "staging file to %v", remote) } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index c7371dbd04c5b..8ade659506826 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -17,11 +17,16 @@ package runnerlib import ( "context" + "crypto/sha256" + "encoding/hex" + "io" "os" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/metricsx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" @@ -35,22 +40,6 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO // (1) Prepare job to obtain artifact staging instructions. presult := &universalPipelineResult{} - cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute) - if err != nil { - return presult, errors.WithContextf(err, "connecting to job service") - } - defer cc.Close() - client := jobpb.NewJobServiceClient(cc) - - prepID, artifactEndpoint, st, err := Prepare(ctx, client, p, opt) - if err != nil { - return presult, err - } - - log.Infof(ctx, "Prepared job with id: %v and staging token: %v", prepID, st) - - // (2) Stage artifacts. - bin := opt.Worker if bin == "" { if self, ok := IsWorkerCompatibleBinary(); ok { @@ -70,6 +59,26 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO } else { log.Infof(ctx, "Using specified worker binary: '%v'", bin) } + // Update pipeline's Go environment to refer to the correct binary. + if err := UpdateGoEnvironmentWorker(bin, p); err != nil { + return presult, err + } + + cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute) + if err != nil { + return presult, errors.WithContextf(err, "connecting to job service") + } + defer cc.Close() + client := jobpb.NewJobServiceClient(cc) + + prepID, artifactEndpoint, st, err := Prepare(ctx, client, p, opt) + if err != nil { + return presult, err + } + + log.Infof(ctx, "Prepared job with id: %v and staging token: %v", prepID, st) + + // (2) Stage artifacts. token, err := Stage(ctx, prepID, artifactEndpoint, bin, st) if err != nil { @@ -104,6 +113,29 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO return res, err } +func UpdateGoEnvironmentWorker(worker string, p *pipepb.Pipeline) error { + fd, err := os.Open(worker) + if err != nil { + return err + } + defer fd.Close() + + sha256W := sha256.New() + n, err := io.Copy(sha256W, fd) + if err != nil { + return errors.WithContextf(err, "unable to read worker binary %v, only read %d bytes", worker, n) + } + hash := hex.EncodeToString(sha256W.Sum(nil)) + pyld := protox.MustEncode(&pipepb.ArtifactFilePayload{ + Path: worker, + Sha256: hash, + }) + if err := graphx.UpdateDefaultEnvWorkerType(graphx.URNArtifactFileType, pyld, p); err != nil { + return err + } + return nil +} + type universalPipelineResult struct { jobID string metrics *metrics.Results diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go index 8103e640cf4e1..b1cd16324c880 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go @@ -83,11 +83,13 @@ func StageViaPortableApi(ctx context.Context, cc *grpc.ClientConn, binary, st st case *jobpb.ArtifactRequestWrapper_GetArtifact: switch typeUrn := request.GetArtifact.Artifact.TypeUrn; typeUrn { + // TODO(BEAM-13647): Legacy Type URN. If requested, provide the binary. + // To be removed later in 2022, once thoroughly obsolete. case graphx.URNArtifactGoWorker: if err := StageFile(binary, stream); err != nil { return errors.Wrap(err, "failed to stage Go worker binary") } - case "beam:artifact:type:file:v1": + case graphx.URNArtifactFileType: typePl := pipepb.ArtifactFilePayload{} if err := proto.Unmarshal(request.GetArtifact.Artifact.TypePayload, &typePl); err != nil { return errors.Wrap(err, "failed to parse artifact file payload")