diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 3efe48e23119..7676d958031c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -180,12 +180,21 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo default: // Composites can often have some unknown urn, permit those. - // Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as well as the deprecated "beam:transform:read:v1", - // but they are composites. Since we don't do anything special with the high level, we simply use their internal subgraph. + // Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", + // as well as the deprecated "beam:transform:read:v1", but they are composites. + // We don't do anything special with these high level composites, but + // we may be dealing with their internal subgraph already, so we ignore this transform. if len(t.GetSubtransforms()) > 0 { continue } - // But if not, fail. + // This may be an "empty" composite without subtransforms or a payload. + // These just do PCollection manipulation which is already represented in the Pipeline graph. + // Simply ignore the composite at this stage, since the runner does nothing with them. + if len(t.GetSpec().GetPayload()) == 0 { + continue + } + // Otherwise fail. + slog.Warn("unknown transform, with payload", "urn", urn, "name", t.GetUniqueName(), "payload", t.GetSpec().GetPayload()) check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "") } }