Skip to content

Commit

Permalink
[BEAM-13647] Use role for Go worker binary. (apache#16729)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Feb 8, 2022
1 parent 7bb7fab commit 56e5c2c
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 39 deletions.
6 changes: 6 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,12 @@ message StandardArtifacts {
// A URN for pip-requirements-file role.
// payload: None
PIP_REQUIREMENTS_FILE = 1 [(beam_urn) = "beam:artifact:role:pip_requirements_file:v1"];

// A URN for the Go worker binary role.
// This represents the executable for a Go SDK environment.
// A Go environment may have one such artifact with this role.
// payload: None
GO_WORKER_BINARY = 2 [(beam_urn) = "beam:artifact:role:go_worker_binary:v1"];
}
}

Expand Down
16 changes: 14 additions & 2 deletions sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func main() {
log.Fatalf("Failed to retrieve staged files: %v", err)
}

// TODO(BEAM-13647): Remove legacy hack once aged out.
const worker = "worker"
name := worker

Expand All @@ -110,12 +111,23 @@ func main() {
default:
found := false
for _, a := range artifacts {
n, _ := artifact.MustExtractFilePayload(a)
if n == worker {
if a.GetRoleUrn() == artifact.URNGoWorkerBinaryRole {
name, _ = artifact.MustExtractFilePayload(a)
found = true
break
}
}
// TODO(BEAM-13647): Remove legacy hack once aged out.
if !found {
for _, a := range artifacts {
n, _ := artifact.MustExtractFilePayload(a)
if n == worker {
found = true
log.Printf("Go worker binary found with legacy name '%v' found", worker)
break
}
}
}
if !found {
log.Fatalf("No artifact named '%v' found", worker)
}
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/artifact/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
const (
URNFileArtifact = "beam:artifact:type:file:v1"
URNUrlArtifact = "beam:artifact:type:url:v1"
URNGoWorkerBinaryRole = "beam:artifact:role:go_worker_binary:v1"
URNPipRequirementsFile = "beam:artifact:role:pip_requirements_file:v1"
URNStagingTo = "beam:artifact:role:staging_to:v1"
NoArtifactsStaged = "__no_artifacts_staged__"
Expand Down Expand Up @@ -172,11 +173,11 @@ func extractStagingToPath(artifact *pipepb.ArtifactInformation) (string, error)

func MustExtractFilePayload(artifact *pipepb.ArtifactInformation) (string, string) {
if artifact.TypeUrn != URNFileArtifact {
log.Fatalf("Unsupported artifact type #{artifact.TypeUrn}")
log.Fatalf("Unsupported artifact type %v", artifact.TypeUrn)
}
ty := pipepb.ArtifactFilePayload{}
if err := proto.Unmarshal(artifact.TypePayload, &ty); err != nil {
log.Fatalf("failed to parse artifact file payload: #{err}")
log.Fatalf("failed to parse artifact file payload: %v", err)
}
return ty.Path, ty.Sha256
}
Expand Down
48 changes: 38 additions & 10 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,17 @@ const (

URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"

URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
URNArtifactStagingTo = "beam:artifact:role:staging_to:v1"
// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"

URNArtifactFileType = "beam:artifact:type:file:v1"
URNArtifactURLType = "beam:artifact:type:url:v1"
URNArtifactGoWorkerRole = "beam:artifact:role:go_worker_binary:v1"

// Environment Urns.
URNEnvProcess = "beam:env:process:v1"
URNEnvExternal = "beam:env:external:v1"
URNEnvDocker = "beam:env:docker:v1"
)

func goCapabilities() []string {
Expand All @@ -85,14 +94,14 @@ func goCapabilities() []string {
func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig func(context.Context) string) (*pipepb.Environment, error) {
var serializedPayload []byte
switch urn {
case "beam:env:process:v1":
case URNEnvProcess:
// TODO Support process based SDK Harness.
return nil, errors.Errorf("unsupported environment %v", urn)
case "beam:env:external:v1":
case URNEnvExternal:
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}}
serializedPayload = protox.MustEncode(payload)
case "beam:env:docker:v1":
case URNEnvDocker:
fallthrough
default:
config := extractEnvironmentConfig(ctx)
Expand All @@ -105,11 +114,9 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
Capabilities: goCapabilities(),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: URNArtifactGoWorker,
RoleUrn: URNArtifactStagingTo,
RolePayload: protox.MustEncode(&pipepb.ArtifactStagingToRolePayload{
StagedName: "worker",
}),
TypeUrn: URNArtifactFileType,
TypePayload: protox.MustEncode(&pipepb.ArtifactFilePayload{}),
RoleUrn: URNArtifactGoWorkerRole,
},
},
}, nil
Expand Down Expand Up @@ -981,6 +988,7 @@ func boolToBounded(bounded bool) pipepb.IsBounded_Enum {
return pipepb.IsBounded_UNBOUNDED
}

// defaultEnvId is the environment ID used for Go Pipeline Environments.
const defaultEnvId = "go"

func (m *marshaller) addDefaultEnv() string {
Expand Down Expand Up @@ -1259,3 +1267,23 @@ func nodeID(n *graph.Node) string {
func scopeID(s *graph.Scope) string {
return fmt.Sprintf("s%v", s.ID())
}

// UpdateDefaultEnvWorkerType is so runners can update the pipeline's default environment
// with the correct artifact type and payload for the Go worker binary.
func UpdateDefaultEnvWorkerType(typeUrn string, pyld []byte, p *pipepb.Pipeline) error {
// Get the Go environment out.
envs := p.GetComponents().GetEnvironments()
env, ok := envs[defaultEnvId]
if !ok {
return errors.Errorf("unable to find default Go environment with ID %q", defaultEnvId)
}
for _, dep := range env.GetDependencies() {
if dep.RoleUrn != URNArtifactGoWorkerRole {
continue
}
dep.TypeUrn = typeUrn
dep.TypePayload = pyld
return nil
}
return errors.Errorf("unable to find dependency with %q role in environment with ID %q,", URNArtifactGoWorkerRole, defaultEnvId)
}
165 changes: 165 additions & 0 deletions sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
package graphx_test

import (
"context"
"reflect"
"testing"

"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/testing/protocmp"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -206,3 +210,164 @@ func (fn *splitPickFn) CreateTracker(_ int) *testRT { return &testRT{}
func (fn *splitPickFn) ProcessElement(_ *testRT, a int, small, big func(int)) {
pickFn(a, small, big)
}

func TestCreateEnvironment(t *testing.T) {
t.Run("process", func(t *testing.T) {
const wantEnv = "process"
urn := graphx.URNEnvProcess
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return wantEnv })
if err == nil {
t.Errorf("CreateEnvironment(%v) = %v error, want error since it's unsupported", urn, err)
}
want := (*pipepb.Environment)(nil)
if !proto.Equal(got, want) {
t.Errorf("CreateEnvironment(%v) = %v, want %v since it's unsupported", urn, got, want)
}
})
tests := []struct {
name string
urn string
payload func(name string) []byte
}{
{
name: "external",
urn: graphx.URNEnvExternal,
payload: func(name string) []byte {
return protox.MustEncode(&pipepb.ExternalPayload{
Endpoint: &pipepb.ApiServiceDescriptor{
Url: name,
},
})
},
}, {
name: "docker",
urn: graphx.URNEnvDocker,
payload: func(name string) []byte {
return protox.MustEncode(&pipepb.DockerPayload{
ContainerImage: name,
})
},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return test.name })
if err != nil {
t.Errorf("CreateEnvironment(%v) = %v error, want nil", test.urn, err)
}
want := &pipepb.Environment{
Urn: test.urn,
Payload: test.payload(test.name),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: graphx.URNArtifactFileType,
TypePayload: protox.MustEncode(&pipepb.ArtifactFilePayload{}),
RoleUrn: graphx.URNArtifactGoWorkerRole,
},
},
}
opts := []cmp.Option{
protocmp.Transform(),
// Ignore the capabilities field, since we can't access that method here.
protocmp.IgnoreFields(&pipepb.Environment{}, protoreflect.Name("capabilities")),
}
if d := cmp.Diff(want, got, opts...); d != "" {
t.Errorf("CreateEnvironment(%v) diff (-want, +got):\n%v", test.urn, d)
}
})
}
}

func TestUpdateDefaultEnvWorkerType(t *testing.T) {
t.Run("noEnvs", func(t *testing.T) {
if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{
Components: &pipepb.Components{},
}); err == nil {
t.Error("UpdateDefaultEnvWorkerType(<emptyEnv>) no error, want err")
}
})
t.Run("noGoEnvs", func(t *testing.T) {
if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{
Components: &pipepb.Components{
Environments: map[string]*pipepb.Environment{
"java": {Urn: "java"},
"python": {Urn: "python"},
"typescript": {Urn: "typescript"},
},
},
}); err == nil {
t.Error("UpdateDefaultEnvWorkerType(<noGoEnvs>) no error, want err")
}
})
t.Run("badGoEnv", func(t *testing.T) {
if err := graphx.UpdateDefaultEnvWorkerType("unused", nil, &pipepb.Pipeline{
Components: &pipepb.Components{
Environments: map[string]*pipepb.Environment{
"java": {Urn: "java"},
"python": {Urn: "python"},
"typescript": {Urn: "typescript"},
"go": {
Urn: "test",
Payload: []byte("test"),
Dependencies: []*pipepb.ArtifactInformation{
{
RoleUrn: "unset",
},
},
},
},
},
}); err == nil {
t.Error("UpdateDefaultEnvWorkerType(<badGoEnv>) no error, want err")
}
})
t.Run("goEnv", func(t *testing.T) {
wantUrn := graphx.URNArtifactFileType
wantPyld := protox.MustEncode(&pipepb.ArtifactFilePayload{
Path: "good",
})
p := &pipepb.Pipeline{
Components: &pipepb.Components{
Environments: map[string]*pipepb.Environment{
"java": {Urn: "java"},
"python": {Urn: "python"},
"typescript": {Urn: "typescript"},
"go": {
Urn: "test",
Payload: []byte("test"),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: "to be removed",
TypePayload: nil,
RoleUrn: graphx.URNArtifactGoWorkerRole,
},
},
},
},
},
}
if err := graphx.UpdateDefaultEnvWorkerType(wantUrn, wantPyld, p); err != nil {
t.Errorf("UpdateDefaultEnvWorkerType(<goEnv>) = %v, want nil", err)
}
got := p.GetComponents().GetEnvironments()["go"]
want := &pipepb.Environment{
Urn: "test",
Payload: []byte("test"),
Dependencies: []*pipepb.ArtifactInformation{
{
TypeUrn: wantUrn,
TypePayload: wantPyld,
RoleUrn: graphx.URNArtifactGoWorkerRole,
},
},
}
opts := []cmp.Option{
protocmp.Transform(),
}
if d := cmp.Diff(want, got, opts...); d != "" {
t.Errorf("UpdateDefaultEnvWorkerType(<goEnv>) diff (-want, +got):\n%v", d)
}
})

}
16 changes: 13 additions & 3 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
Expand Down Expand Up @@ -59,16 +60,25 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
}

log.Infof(ctx, "Staging worker binary: %v", bin)

if err := StageFile(ctx, opts.Project, workerURL, bin); err != nil {
hash, err := stageFile(ctx, opts.Project, workerURL, bin)
if err != nil {
return presult, err
}
log.Infof(ctx, "Staged worker binary: %v", workerURL)

if err := graphx.UpdateDefaultEnvWorkerType(
graphx.URNArtifactURLType,
protox.MustEncode(&pipepb.ArtifactUrlPayload{
Url: workerURL,
Sha256: hash,
}), raw); err != nil {
return presult, err
}

if opts.WorkerJar != "" {
log.Infof(ctx, "Staging Dataflow worker jar: %v", opts.WorkerJar)

if err := StageFile(ctx, opts.Project, jarURL, opts.WorkerJar); err != nil {
if _, err := stageFile(ctx, opts.Project, jarURL, opts.WorkerJar); err != nil {
return presult, err
}
log.Infof(ctx, "Staged worker jar: %v", jarURL)
Expand Down
Loading

0 comments on commit 56e5c2c

Please sign in to comment.