Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28187][prism] Basic cross language support. #28545

Merged
merged 2 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (s *decodeStream) Read() (*FullValue, error) {
}
err := s.d.DecodeTo(s.r, &s.ret)
if err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, errors.Wrap(err, "decodeStream value decode failed")
}
s.next++
Expand Down Expand Up @@ -342,6 +345,9 @@ func (s *decodeMultiChunkStream) Read() (*FullValue, error) {
if s.chunk == 0 && s.next == 0 {
chunk, err := coder.DecodeVarInt(s.r.reader)
if err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, errors.Wrap(err, "decodeMultiChunkStream chunk size decoding failed")
}
s.chunk = chunk
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo
pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{
WorkerId: wk.ID,
})
wk.Stop()
}

func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.DockerPayload, wk *worker.W, artifactEndpoint string) error {
Expand Down Expand Up @@ -170,6 +171,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
// Start goroutine to wait on container state.
go func() {
defer cli.Close()
defer wk.Stop()

statusCh, errCh := cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
select {
Expand Down
52 changes: 29 additions & 23 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"sort"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand All @@ -46,15 +47,14 @@ func RunPipeline(j *jobservices.Job) {
// here, we only want and need the go one, operating
// in loopback mode.
envs := j.Pipeline.GetComponents().GetEnvironments()
if len(envs) != 1 {
j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs))
return
}
env, _ := getOnlyPair(envs)
wk, err := makeWorker(env, j)
if err != nil {
j.Failed(err)
return
wks := map[string]*worker.W{}
for envID := range envs {
wk, err := makeWorker(envID, j)
if err != nil {
j.Failed(err)
return
}
wks[envID] = wk
}
// When this function exits, we cancel the context to clear
// any related job resources.
Expand All @@ -65,15 +65,12 @@ func RunPipeline(j *jobservices.Job) {
j.SendMsg("running " + j.String())
j.Running()

if err := executePipeline(j.RootCtx, wk, j); err != nil {
if err := executePipeline(j.RootCtx, wks, j); err != nil {
j.Failed(err)
return
}
j.SendMsg("pipeline completed " + j.String())

// Stop the worker.
wk.Stop()

j.SendMsg("terminating " + j.String())
j.Done()
}
Expand All @@ -95,7 +92,7 @@ func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
// Check for connection succeeding after we've created the environment successfully.
timeout := 1 * time.Minute
time.AfterFunc(timeout, func() {
if wk.Connected() {
if wk.Connected() || wk.Stopped() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where Stopped() could be indicative of an error in worker start-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The stopped bit is only set when Stop is called, which only happens when a context has cancelled, which only happens on post Job clean up.

Other errors would be reported earlier (ideally).

This approach largely prevents this ("worker didn't connect") function from failing successful jobs after the job completed, if the job never needed the environment at all. I intend to change worker environments to start on demand, and move worker startup to after preprocessing. As it stands, a minute+ job using one of these will just die. Won't happen for real xlang transforms though.

return
}
err := fmt.Errorf("prism %v didn't get control connection to %v after %v", wk, wk.Endpoint(), timeout)
Expand All @@ -115,7 +112,7 @@ type processor struct {
transformExecuters map[string]transformExecuter
}

func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) error {
func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservices.Job) error {
pipeline := j.Pipeline
comps := proto.Clone(pipeline.GetComponents()).(*pipepb.Components)

Expand Down Expand Up @@ -158,7 +155,12 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
// TODO move this loop and code into the preprocessor instead.
stages := map[string]*stage{}
var impulses []string
for _, stage := range topo {

// Inialize the "dataservice cache" to support side inputs.
// TODO(https://github.com/apache/beam/issues/28543), remove this concept.
ds := &worker.DataService{}

for i, stage := range topo {
tid := stage.transforms[0]
t := ts[tid]
urn := t.GetSpec().GetUrn()
Expand All @@ -169,11 +171,11 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
if stage.exe != nil {
stage.envID = stage.exe.ExecuteWith(t)
}
stage.ID = wk.NextStage()
stage.ID = fmt.Sprintf("stage-%03d", i)
wk := wks[stage.envID]

switch stage.envID {
case "": // Runner Transforms

var onlyOut string
for _, out := range t.GetOutputs() {
onlyOut = out
Expand Down Expand Up @@ -232,10 +234,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
em.AddStage(stage.ID, inputs, nil, []string{getOnlyValue(t.GetOutputs())})
}
stages[stage.ID] = stage
wk.Descriptors[stage.ID] = stage.desc
case wk.Env:
// Great! this is for this environment. // Broken abstraction.
if err := buildDescriptor(stage, comps, wk); err != nil {
if err := buildDescriptor(stage, comps, wk, ds); err != nil {
return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err)
}
stages[stage.ID] = stage
Expand All @@ -259,7 +259,12 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
maxParallelism := make(chan struct{}, 8)
// Execute stages here
bundleFailed := make(chan error)
bundles := em.Bundles(ctx, wk.NextInst)

var instID uint64
bundles := em.Bundles(ctx, func() string {
return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1))
})

for {
select {
case <-ctx.Done():
Expand All @@ -273,7 +278,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
go func(rb engine.RunBundle) {
defer func() { <-maxParallelism }()
s := stages[rb.StageID]
if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil {
wk := wks[s.envID]
if err := s.Execute(ctx, j, wk, ds, comps, em, rb); err != nil {
// Ensure we clean up on bundle failure
em.FailBundle(rb)
bundleFailed <- err
Expand Down
20 changes: 13 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
return nil, err
}
var errs []error
check := func(feature string, got, want any) {
if got != want {
err := unimplementedError{
feature: feature,
value: got,
check := func(feature string, got any, wants ...any) {
for _, want := range wants {
if got == want {
return
}
errs = append(errs, err)
}

err := unimplementedError{
feature: feature,
value: got,
}
errs = append(errs, err)
}

// Inspect Transforms for unsupported features.
Expand All @@ -114,6 +118,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
urns.TransformGBK,
urns.TransformFlatten,
urns.TransformCombinePerKey,
urns.TransformCombineGlobally, // Used by Java SDK
urns.TransformCombineGroupedValues, // Used by Java SDK
urns.TransformAssignWindows:
// Very few expected transforms types for submitted pipelines.
// Most URNs are for the runner to communicate back to the SDK for execution.
Expand Down Expand Up @@ -154,7 +160,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
}
if !bypassedWindowingStrategies[wsID] {
check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY)
check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS)
check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW)
// Non nil triggers should fail.
if ws.GetTrigger().GetDefault() == nil {
Expand Down
16 changes: 8 additions & 8 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type stage struct {
OutputsToCoders map[string]engine.PColInfo
}

func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error {
func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, ds *worker.DataService, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error {
slog.Debug("Execute: starting bundle", "bundle", rb)

var b *worker.B
Expand Down Expand Up @@ -204,8 +204,8 @@ progress:
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
// TODO handle side input data properly.
wk.D.Commit(b.OutputData)
// TODO(https://github.com/apache/beam/issues/28543) handle side input data properly.
ds.Commit(b.OutputData)
var residualData [][]byte
var minOutputWatermark map[string]mtime.Time
for _, rr := range resp.GetResidualRoots() {
Expand Down Expand Up @@ -270,7 +270,7 @@ func portFor(wInCid string, wk *worker.W) []byte {
// It assumes that the side inputs are not sourced from PCollections generated by any transform in this stage.
//
// Because we need the local ids for routing the sources/sinks information.
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, ds *worker.DataService) error {
// Assume stage has an indicated primary input

coders := map[string]*pipepb.Coder{}
Expand Down Expand Up @@ -327,7 +327,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
// Update side inputs to point to new PCollection with any replaced coders.
transforms[si.transform].GetInputs()[si.local] = newGlobal
}
prepSide, err := handleSideInput(si.transform, si.local, si.global, comps, coders, wk)
prepSide, err := handleSideInput(si.transform, si.local, si.global, comps, coders, ds)
if err != nil {
slog.Error("buildDescriptor: handleSideInputs", err, slog.String("transformID", si.transform))
return err
Expand Down Expand Up @@ -392,7 +392,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
}

// handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark.
func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) {
func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, ds *worker.DataService) (func(b *worker.B, watermark mtime.Time), error) {
t := comps.GetTransforms()[tid]
sis, err := getSideInputs(t)
if err != nil {
Expand All @@ -412,7 +412,7 @@ func handleSideInput(tid, local, global string, comps *pipepb.Components, coders

global, local := global, local
return func(b *worker.B, watermark mtime.Time) {
data := wk.D.GetAllData(global)
data := ds.GetAllData(global)

if b.IterableSideInputData == nil {
b.IterableSideInputData = map[string]map[string]map[typex.Window][][]byte{}
Expand Down Expand Up @@ -447,7 +447,7 @@ func handleSideInput(tid, local, global string, comps *pipepb.Components, coders
global, local := global, local
return func(b *worker.B, watermark mtime.Time) {
// May be of zero length, but that's OK. Side inputs can be empty.
data := wk.D.GetAllData(global)
data := ds.GetAllData(global)
if b.MultiMapSideInputData == nil {
b.MultiMapSideInputData = map[string]map[string]map[typex.Window]map[string][][]byte{}
}
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ var (
// SDK transforms.
TransformParDo = ptUrn(pipepb.StandardPTransforms_PAR_DO)
TransformCombinePerKey = ctUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY)
TransformCombineGlobally = ctUrn(pipepb.StandardPTransforms_COMBINE_GLOBALLY)
TransformReshuffle = ctUrn(pipepb.StandardPTransforms_RESHUFFLE)
TransformCombineGroupedValues = cmbtUrn(pipepb.StandardPTransforms_COMBINE_GROUPED_VALUES)
TransformPreCombine = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_PRECOMBINE)
TransformMerge = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_MERGE_ACCUMULATORS)
TransformExtract = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_EXTRACT_OUTPUTS)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b *B) Respond(resp *fnpb.InstructionResponse) {
}
b.responded = true
if resp.GetError() != "" {
b.BundleErr = fmt.Errorf("bundle %v failed:%v", resp.GetInstructionId(), resp.GetError())
b.BundleErr = fmt.Errorf("bundle %v %v failed:%v", resp.GetInstructionId(), b.PBDID, resp.GetError())
close(b.Resp)
return
}
Expand Down
24 changes: 12 additions & 12 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type W struct {
server *grpc.Server

// These are the ID sources
inst, bund uint64
inst uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now being pulled from the ProcessBundle instruction abstraction directly, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the bundle descriptor IDs.

The shorter lived RPCs like Progress and Split still use the Instruction IDs from the worker side, rather than the "global" one in execute used by the ElementManager.

I haven't yet nailed down the best way to have "global" state accessed by the worker abstraction, vs dedicated state (like environment protos). It's getting there though.

connected, stopped atomic.Bool

InstReqs chan *fnpb.InstructionRequest
Expand All @@ -76,8 +76,6 @@ type W struct {
mu sync.Mutex
activeInstructions map[string]controlResponder // Active instructions keyed by InstructionID
Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID

D *DataService
}

type controlResponder interface {
Expand All @@ -104,8 +102,6 @@ func New(id, env string) *W {

activeInstructions: make(map[string]controlResponder),
Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),

D: &DataService{},
}
slog.Debug("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
fnpb.RegisterBeamFnControlServer(wk.server, wk)
Expand Down Expand Up @@ -149,11 +145,7 @@ func (wk *W) Stop() {
}

func (wk *W) NextInst() string {
return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
}

func (wk *W) NextStage() string {
return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
return fmt.Sprintf("inst-%v-%03d", wk.Env, atomic.AddUint64(&wk.inst, 1))
}

// TODO set logging level.
Expand Down Expand Up @@ -263,6 +255,11 @@ func (wk *W) Connected() bool {
return wk.connected.Load()
}

// Stopped indicates that the worker has stopped.
func (wk *W) Stopped() bool {
return wk.stopped.Load()
}

// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
Expand Down Expand Up @@ -312,10 +309,12 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.mu.Lock()
// Fail extant instructions
slog.Debug("SDK Disconnected", "worker", wk, "ctx_error", ctrl.Context().Err(), "outstanding_instructions", len(wk.activeInstructions))

msg := fmt.Sprintf("SDK worker disconnected: %v, %v active instructions", wk.String(), len(wk.activeInstructions))
for instID, b := range wk.activeInstructions {
b.Respond(&fnpb.InstructionResponse{
InstructionId: instID,
Error: "SDK Disconnected",
Error: msg,
})
}
wk.mu.Unlock()
Expand Down Expand Up @@ -536,7 +535,7 @@ func (wk *W) sendInstruction(ctx context.Context, req *fnpb.InstructionRequest)

req.InstructionId = progInst

if wk.stopped.Load() {
if wk.Stopped() {
return nil
}
wk.InstReqs <- req
Expand Down Expand Up @@ -566,6 +565,7 @@ func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.

// DataService is slated to be deleted in favour of stage based state
// management for side inputs.
// TODO(https://github.com/apache/beam/issues/28543), remove this concept.
type DataService struct {
mu sync.Mutex
// TODO actually quick process the data to windows here as well.
Expand Down
Loading
Loading