From f26030ad77b726c4687430305090fef64bb71cb3 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 8 Jun 2024 16:09:17 -0400 Subject: [PATCH 1/2] warn when using auto_unique_labels --- sdks/python/apache_beam/pipeline.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index a20fae2ae735c..b03603a83c998 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -694,6 +694,10 @@ def apply( if auto_unique_labels: # If auto_unique_labels is set, we will append a unique suffix to the # label to make it unique. + logging.warning( + 'Using --auto_unique_labels could cause data loss when ' + 'updating a pipeline or reloading the job state. ' + 'This is not recommended for streaming jobs.') unique_label = self._generate_unique_label(transform) return self.apply(transform, pvalueish, unique_label) else: From 17b7325df531c5127b21878544ab8a11d90c270e Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 14 Jun 2024 12:24:30 -0400 Subject: [PATCH 2/2] Added more notes --- sdks/python/apache_beam/options/pipeline_options.py | 5 ++++- sdks/python/apache_beam/pipeline.py | 5 ++++- sdks/python/apache_beam/pipeline_test.py | 5 ++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 42aee47a957e8..1773b5de8969c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -556,7 +556,10 @@ def _add_argparse_args(cls, parser): action='store_true', help='Whether to automatically generate unique transform labels ' 'for every transform. The default behavior is to raise an ' - 'exception if a transform is created with a non-unique label.') + 'exception if a transform is created with a non-unique label. ' + 'Using --auto_unique_labels could cause data loss when ' + 'updating a pipeline or reloading the job state. ' + 'This is not recommended for streaming jobs.') class StreamingOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index b03603a83c998..128ab9206bfd4 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -706,7 +706,10 @@ def apply( 'To apply a transform with a specified label, write ' 'pvalue | "label" >> transform or use the option ' '"auto_unique_labels" to automatically generate unique ' - 'transform labels' % full_label) + 'transform labels. Note "auto_unique_labels" ' + 'could cause data loss when updating a pipeline or ' + 'reloading the job state. This is not recommended for ' + 'streaming jobs.' % full_label) self.applied_labels.add(full_label) pvalueish, inputs = transform._extract_input_pvalues(pvalueish) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 8d6cf4aceea01..b11211668e900 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -267,7 +267,10 @@ def test_reuse_custom_transform_instance(self): 'pipeline. To apply a transform with a specified label, write ' 'pvalue | "label" >> transform or use the option ' '"auto_unique_labels" to automatically generate unique ' - 'transform labels') + 'transform labels. Note "auto_unique_labels" ' + 'could cause data loss when updating a pipeline or ' + 'reloading the job state. This is not recommended for ' + 'streaming jobs.') def test_auto_unique_labels(self):