From e16d401987d9656bf791afc0d4679b09b8f3fe04 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 5 Dec 2023 13:15:33 -0800 Subject: [PATCH] [YAML] Clean up some confusing error messages. (#29481) * Provide sane defaults rather than NullPtrExceptions for optional BigQuery parameters. * Better error when cross-language is used with (incompatible) local streaming Python runner. * Add format and schema to readme PubSub examples. --- ...torageWriteApiSchemaTransformProvider.java | 11 +++++--- .../runners/direct/direct_runner.py | 11 ++++++++ sdks/python/apache_beam/yaml/README.md | 28 ++++++++++++++++++- sdks/python/apache_beam/yaml/readme_test.py | 4 +-- 4 files changed, 47 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 39e6fd7c809d..98cc246ce0dd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -338,10 +338,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { if (inputRows.isBounded() == IsBounded.UNBOUNDED) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); Boolean autoSharding = configuration.getAutoSharding(); - Integer numStreams = configuration.getNumStreams(); + int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams(); + boolean useAtLeastOnceSemantics = + configuration.getUseAtLeastOnceSemantics() == null + ? false + : configuration.getUseAtLeastOnceSemantics(); // Triggering frequency is only applicable for exactly-once - if (configuration.getUseAtLeastOnceSemantics() == null - || !configuration.getUseAtLeastOnceSemantics()) { + if (!useAtLeastOnceSemantics) { write = write.withTriggeringFrequency( (triggeringFrequency == null || triggeringFrequency <= 0) @@ -349,7 +352,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { : Duration.standardSeconds(triggeringFrequency)); } // set num streams if specified, otherwise default to autoSharding - if (numStreams != null && numStreams > 0) { + if (numStreams > 0) { write = write.withNumStorageWriteApiStreams(numStreams); } else if (autoSharding == null || autoSharding) { write = write.withAutoSharding(); diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index db53e4122bbc..a470ba80d8ee 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -507,6 +507,17 @@ def run_pipeline(self, pipeline, options): from apache_beam.runners.direct.transform_evaluator import \ TransformEvaluatorRegistry from apache_beam.testing.test_stream import TestStream + from apache_beam.transforms.external import ExternalTransform + + class VerifyNoCrossLanguageTransforms(PipelineVisitor): + """Visitor determining whether a Pipeline uses a TestStream.""" + def visit_transform(self, applied_ptransform): + if isinstance(applied_ptransform.transform, ExternalTransform): + raise RuntimeError( + "Streaming Python direct runner " + "does not support cross-language pipelines.") + + pipeline.visit(VerifyNoCrossLanguageTransforms()) # If the TestStream I/O is used, use a mock test clock. class TestStreamUsageVisitor(PipelineVisitor): diff --git a/sdks/python/apache_beam/yaml/README.md b/sdks/python/apache_beam/yaml/README.md index e1528a8e4804..0b5c118ba8af 100644 --- a/sdks/python/apache_beam/yaml/README.md +++ b/sdks/python/apache_beam/yaml/README.md @@ -274,6 +274,13 @@ pipeline: - type: ReadFromPubSub config: topic: myPubSubTopic + format: json + schema: + type: object + properties: + col1: {type: string} + col2: {type: integer} + col3: {type: number} - type: WindowInto windowing: type: fixed @@ -282,6 +289,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic + format: json ``` Rather than using an explicit `WindowInto` operation, one may instead tag a @@ -295,6 +303,8 @@ pipeline: - type: ReadFromPubSub config: topic: myPubSubTopic + format: ... + schema: ... - type: SomeAggregation windowing: type: sliding @@ -303,6 +313,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic + format: json ``` Note that the `Sql` operation itself is often a from of aggregation, and @@ -316,6 +327,8 @@ pipeline: - type: ReadFromPubSub config: topic: myPubSubTopic + format: ... + schema: ... - type: Sql config: query: "select col1, count(*) as c from PCOLLECTION" @@ -325,6 +338,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic + format: json ``` The specified windowing is applied to all inputs, in this case resulting in @@ -337,11 +351,15 @@ pipeline: name: ReadLeft config: topic: leftTopic + format: ... + schema: ... - type: ReadFromPubSub name: ReadRight config: topic: rightTopic + format: ... + schema: ... - type: Sql config: @@ -364,7 +382,9 @@ pipeline: transforms: - type: ReadFromPubSub config: - topic: myPubSubTopic + topic: myPubSubTopic + format: ... + schema: ... windowing: type: fixed size: 60 @@ -374,6 +394,7 @@ pipeline: - type: WriteToPubSub config: topic: anotherPubSubTopic + format: json ``` One can also specify windowing at the top level of a pipeline (or composite), @@ -388,12 +409,15 @@ pipeline: - type: ReadFromPubSub config: topic: myPubSubTopic + format: ... + schema: ... - type: Sql config: query: "select col1, count(*) as c from PCOLLECTION" - type: WriteToPubSub config: topic: anotherPubSubTopic + format: json windowing: type: fixed size: 60 @@ -410,6 +434,8 @@ pipeline: type: ReadFromPubSub config: topic: myPubSubTopic + format: ... + schema: ... windowing: type: fixed size: 10 diff --git a/sdks/python/apache_beam/yaml/readme_test.py b/sdks/python/apache_beam/yaml/readme_test.py index 7f2d193bf35f..8ec9b22cc92f 100644 --- a/sdks/python/apache_beam/yaml/readme_test.py +++ b/sdks/python/apache_beam/yaml/readme_test.py @@ -99,7 +99,7 @@ def guess_name_and_type(expr): class FakeReadFromPubSub(beam.PTransform): - def __init__(self, topic): + def __init__(self, topic, format, schema): pass def expand(self, p): @@ -112,7 +112,7 @@ def expand(self, p): class FakeWriteToPubSub(beam.PTransform): - def __init__(self, topic): + def __init__(self, topic, format): pass def expand(self, pcoll):