Skip to content

Commit

Permalink
warn when using auto_unique_labels (#31551)
Browse files Browse the repository at this point in the history
* warn when using auto_unique_labels

* Added more notes
  • Loading branch information
liferoad committed Jun 18, 2024
1 parent 2b90785 commit 33267ea
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,10 @@ def _add_argparse_args(cls, parser):
action='store_true',
help='Whether to automatically generate unique transform labels '
'for every transform. The default behavior is to raise an '
'exception if a transform is created with a non-unique label.')
'exception if a transform is created with a non-unique label. '
'Using --auto_unique_labels could cause data loss when '
'updating a pipeline or reloading the job state. '
'This is not recommended for streaming jobs.')


class StreamingOptions(PipelineOptions):
Expand Down
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,10 @@ def apply(
if auto_unique_labels:
# If auto_unique_labels is set, we will append a unique suffix to the
# label to make it unique.
logging.warning(
'Using --auto_unique_labels could cause data loss when '
'updating a pipeline or reloading the job state. '
'This is not recommended for streaming jobs.')
unique_label = self._generate_unique_label(transform)
return self.apply(transform, pvalueish, unique_label)
else:
Expand All @@ -702,7 +706,10 @@ def apply(
'To apply a transform with a specified label, write '
'pvalue | "label" >> transform or use the option '
'"auto_unique_labels" to automatically generate unique '
'transform labels' % full_label)
'transform labels. Note "auto_unique_labels" '
'could cause data loss when updating a pipeline or '
'reloading the job state. This is not recommended for '
'streaming jobs.' % full_label)
self.applied_labels.add(full_label)

pvalueish, inputs = transform._extract_input_pvalues(pvalueish)
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ def test_reuse_custom_transform_instance(self):
'pipeline. To apply a transform with a specified label, write '
'pvalue | "label" >> transform or use the option '
'"auto_unique_labels" to automatically generate unique '
'transform labels')
'transform labels. Note "auto_unique_labels" '
'could cause data loss when updating a pipeline or '
'reloading the job state. This is not recommended for '
'streaming jobs.')

def test_auto_unique_labels(self):

Expand Down

0 comments on commit 33267ea

Please sign in to comment.