diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 3526ee00cc1f..5760ce7871b7 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -127,6 +127,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo urns.TransformCombinePerKey, urns.TransformCombineGlobally, // Used by Java SDK urns.TransformCombineGroupedValues, // Used by Java SDK + urns.TransformMerge, // Used directly by Python SDK if "pre-optimized" + urns.TransformPreCombine, // Used directly by Python SDK if "pre-optimized" + urns.TransformExtract, // Used directly by Python SDK if "pre-optimized" urns.TransformAssignWindows: // Very few expected transforms types for submitted pipelines. // Most URNs are for the runner to communicate back to the SDK for execution. @@ -165,12 +168,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now. - case "": - // Composites can often have no spec - if len(t.GetSubtransforms()) > 0 { - continue - } - fallthrough case urns.TransformTestStream: var testStream pipepb.TestStreamPayload if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil { @@ -179,7 +176,15 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side. testStreamIds = append(testStreamIds, tid) + default: + // Composites can often have some unknown urn, permit those. + // Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as well as the deprecated "beam:transform:read:v1", + // but they are composites. Since we don't do anything special with the high level, we simply use their internal subgraph. + if len(t.GetSubtransforms()) > 0 { + continue + } + // But if not, fail. check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "") } } @@ -191,7 +196,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo // Inspect Windowing strategies for unsupported features. for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() { check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0)) - check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY) + // Both Closing behaviors are identical without additional trigger firings. + check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS) check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING) if ws.GetWindowFn().GetUrn() != urns.WindowFnSession { check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING) @@ -398,3 +404,50 @@ func (s *Server) GetState(_ context.Context, req *jobpb.GetJobStateRequest) (*jo Timestamp: timestamppb.New(j.stateTime), }, nil } + +// DescribePipelineOptions is a no-op since it's unclear how it is to function. +// Apparently only implemented in the Python SDK. +func (s *Server) DescribePipelineOptions(context.Context, *jobpb.DescribePipelineOptionsRequest) (*jobpb.DescribePipelineOptionsResponse, error) { + return &jobpb.DescribePipelineOptionsResponse{ + Options: []*jobpb.PipelineOptionDescriptor{}, + }, nil +} + +// GetStateStream returns the job state as it changes. +func (s *Server) GetStateStream(req *jobpb.GetJobStateRequest, stream jobpb.JobService_GetStateStreamServer) error { + s.mu.Lock() + job, ok := s.jobs[req.GetJobId()] + s.mu.Unlock() + if !ok { + return fmt.Errorf("job with id %v not found", req.GetJobId()) + } + + job.streamCond.L.Lock() + defer job.streamCond.L.Unlock() + + state := job.state.Load().(jobpb.JobState_Enum) + for { + job.streamCond.L.Unlock() + stream.Send(&jobpb.JobStateEvent{ + State: state, + Timestamp: timestamppb.Now(), + }) + job.streamCond.L.Lock() + switch state { + case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED: + // Reached terminal state. + return nil + } + newState := job.state.Load().(jobpb.JobState_Enum) + for state == newState { + select { // Quit out if the external connection is done. + case <-stream.Context().Done(): + return context.Cause(stream.Context()) + default: + } + job.streamCond.Wait() + newState = job.state.Load().(jobpb.JobState_Enum) + } + state = newState + } +}