Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#31403] Relax prism constraints to allow python wordcount to execute. #31644

Merged
merged 1 commit into from
Jun 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 60 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
urns.TransformCombinePerKey,
urns.TransformCombineGlobally, // Used by Java SDK
urns.TransformCombineGroupedValues, // Used by Java SDK
urns.TransformMerge, // Used directly by Python SDK if "pre-optimized"
urns.TransformPreCombine, // Used directly by Python SDK if "pre-optimized"
urns.TransformExtract, // Used directly by Python SDK if "pre-optimized"
urns.TransformAssignWindows:
// Very few expected transforms types for submitted pipelines.
// Most URNs are for the runner to communicate back to the SDK for execution.
Expand Down Expand Up @@ -165,12 +168,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.

case "":
// Composites can often have no spec
if len(t.GetSubtransforms()) > 0 {
continue
}
fallthrough
case urns.TransformTestStream:
var testStream pipepb.TestStreamPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {
Expand All @@ -179,7 +176,15 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side.
testStreamIds = append(testStreamIds, tid)

default:
// Composites can often have some unknown urn, permit those.
// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as well as the deprecated "beam:transform:read:v1",
// but they are composites. Since we don't do anything special with the high level, we simply use their internal subgraph.
if len(t.GetSubtransforms()) > 0 {
continue
}
// But if not, fail.
check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")
}
}
Expand All @@ -191,7 +196,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
// Inspect Windowing strategies for unsupported features.
for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0))
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY)
// Both Closing behaviors are identical without additional trigger firings.
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS)
check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)
if ws.GetWindowFn().GetUrn() != urns.WindowFnSession {
check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
Expand Down Expand Up @@ -398,3 +404,50 @@ func (s *Server) GetState(_ context.Context, req *jobpb.GetJobStateRequest) (*jo
Timestamp: timestamppb.New(j.stateTime),
}, nil
}

// DescribePipelineOptions is a no-op since it's unclear how it is to function.
// Apparently only implemented in the Python SDK.
func (s *Server) DescribePipelineOptions(context.Context, *jobpb.DescribePipelineOptionsRequest) (*jobpb.DescribePipelineOptionsResponse, error) {
return &jobpb.DescribePipelineOptionsResponse{
Options: []*jobpb.PipelineOptionDescriptor{},
}, nil
}

// GetStateStream returns the job state as it changes.
func (s *Server) GetStateStream(req *jobpb.GetJobStateRequest, stream jobpb.JobService_GetStateStreamServer) error {
s.mu.Lock()
job, ok := s.jobs[req.GetJobId()]
s.mu.Unlock()
if !ok {
return fmt.Errorf("job with id %v not found", req.GetJobId())
}

job.streamCond.L.Lock()
defer job.streamCond.L.Unlock()

state := job.state.Load().(jobpb.JobState_Enum)
for {
job.streamCond.L.Unlock()
stream.Send(&jobpb.JobStateEvent{
State: state,
Timestamp: timestamppb.Now(),
})
job.streamCond.L.Lock()
switch state {
case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED:
// Reached terminal state.
return nil
}
newState := job.state.Load().(jobpb.JobState_Enum)
for state == newState {
select { // Quit out if the external connection is done.
case <-stream.Context().Done():
return context.Cause(stream.Context())
default:
}
job.streamCond.Wait()
newState = job.state.Load().(jobpb.JobState_Enum)
}
state = newState
}
}
Loading