Skip to content

Commit

Permalink
[#31991][prism] Allow Empty Composites (#32024)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
lostluck and lostluck authored Jul 30, 2024
1 parent c624e02 commit 121ac71
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,21 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

default:
// Composites can often have some unknown urn, permit those.
// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as well as the deprecated "beam:transform:read:v1",
// but they are composites. Since we don't do anything special with the high level, we simply use their internal subgraph.
// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1",
// as well as the deprecated "beam:transform:read:v1", but they are composites.
// We don't do anything special with these high level composites, but
// we may be dealing with their internal subgraph already, so we ignore this transform.
if len(t.GetSubtransforms()) > 0 {
continue
}
// But if not, fail.
// This may be an "empty" composite without subtransforms or a payload.
// These just do PCollection manipulation which is already represented in the Pipeline graph.
// Simply ignore the composite at this stage, since the runner does nothing with them.
if len(t.GetSpec().GetPayload()) == 0 {
continue
}
// Otherwise fail.
slog.Warn("unknown transform, with payload", "urn", urn, "name", t.GetUniqueName(), "payload", t.GetSpec().GetPayload())
check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")
}
}
Expand Down

0 comments on commit 121ac71

Please sign in to comment.