From 82f3e234873676d5267504ce4893575c5b2996a3 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Wed, 27 Mar 2024 21:51:06 -0400 Subject: [PATCH 01/31] Remove unavailable transforms from catalog --- .../content/en/documentation/transforms/python/overview.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index a8982fa9798e..644aa0d1b514 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -54,7 +54,6 @@ limitations under the License. CombineGloballyTransforms to combine elements. CombinePerKeyTransforms to combine elements for each key. CombineValuesTransforms to combine keyed iterables. - CombineWithContextNot available. CountCounts the number of elements within each aggregation. DistinctProduces a collection containing distinct elements from the input collection. GroupByKeyTakes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. @@ -77,10 +76,8 @@ limitations under the License. FlattenGiven multiple input collections, produces a single output collection containing all elements from all of the input collections. - PAssertNot available. ReshuffleGiven an input collection, redistributes the elements between workers. This is most useful for adjusting parallelism or preventing coupled failures. - ViewNot available. WindowIntoLogically divides up or groups the elements of a collection into finite windows according to a function. From b6a45038d5ab6de71df191f58e8ce427fa470651 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Wed, 27 Mar 2024 22:13:57 -0400 Subject: [PATCH 02/31] Implement approximatequantile example --- .../aggregation/approximatequantiles.py | 51 +++++++++++++++++++ .../aggregation/approximatequantiles_test.py | 51 +++++++++++++++++++ .../aggregation/approximatequantiles.md | 9 +++- 3 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py new file mode 100644 index 000000000000..973d178a8118 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: AppromximateQuantiles +# description: Demonstration of ApproximateQuantiles transform usage. +# multifile: false +# default_example: false +# context_line: 43 +# categories: +# complexity: BASIC +# tags: +# - transforms +# - integers + + +def approximatequantiles(test=None): + # [START quantiles] + import apache_beam as beam + from apache_beam.transforms.stats import ApproximateQuantiles + + with beam.Pipeline() as pipeline: + quantiles = (pipeline + | 'Create data' >> beam.Create(list(range(1001))) + | 'Compute quantiles' >> ApproximateQuantiles.Globally(5) + | beam.Map(print)) + # [END approximatequantiles] + if test: + test(quantiles) + + +if __name__ == '__main__': + cogroupbykey() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py new file mode 100644 index 000000000000..7be0b71193fe --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to + +from . import approximatequantiles + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.approximatequantiles.print', + lambda x: x) +class ApproximateQuantilesTest(unittest.TestCase): + def test_approximatequantiles(self): + def check_result(quantiles): + assert_that( + quantiles, + equal_to([[ + 0, + 250, + 500, + 750, + 1000 + ]])) + + approximatequantiles.approximatequantiles(test=check_result) + + +if __name__ == '__main__': + unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md b/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md index adb6d09e768a..64c4455e7a3a 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md @@ -17,7 +17,14 @@ limitations under the License. # ApproximateQuantiles +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.stat" class="ApproximateQuantile" >}} + ## Examples -See [Issue 19547](https://github.com/apache/beam/issues/19547) for updates. + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_ApproximateQuantiles" show="groupintobatches" >}} +{{< /playground >}} ## Related transforms From f0a75ba5833a56ba93c79c8446ad9cf3c441170d Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Wed, 27 Mar 2024 22:25:44 -0400 Subject: [PATCH 03/31] add context to groupbykey example --- .../katas/python/Core Transforms/GroupByKey/GroupByKey/task.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py b/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py index 7e5d3fb954f9..2832de39cad4 100644 --- a/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py +++ b/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py @@ -27,6 +27,7 @@ # - group # - strings +# [START groupbykey] import apache_beam as beam with beam.Pipeline() as p: @@ -35,3 +36,4 @@ | beam.Map(lambda word: (word[0], word)) | beam.GroupByKey() | beam.LogElements()) +# [END groupbykey] From b8fde1f98d3d26a253c98999771179a57458e4b4 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Wed, 27 Mar 2024 22:26:41 -0400 Subject: [PATCH 04/31] add in live groupbykey sort in example --- .../transforms/python/aggregation/groupbykey.md | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md b/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md index 5ba04bc2fe1a..e3f287da7d04 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md @@ -32,17 +32,9 @@ See more information in the [Beam Programming Guide](/documentation/programming- We use `GroupByKey` to group all the produce for each season. -{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py" >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py" groupbykey >}} -{{< /highlight >}} - -{{< paragraph class="notebook-skip" >}} -Output: -{{< /paragraph >}} - -{{< highlight class="notebook-skip" >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py" produce_counts >}} -{{< /highlight >}} +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_GroupByKey" show="groupbykeysort" >}} +{{< /playground >}} **Example 2**: From 7736c4d047955bf6b9c4402698bd9aeaeb1c0a00 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 07:37:34 -0400 Subject: [PATCH 05/31] add approximatequantiles example and update catalog --- .../site/content/en/documentation/transforms/python/overview.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index 644aa0d1b514..f52a9f4c9556 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -48,7 +48,7 @@ limitations under the License. - + From 25c38124441c9fd0b182b8d4d09499e992d50d7e Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 09:48:58 -0400 Subject: [PATCH 06/31] Add approximateunique example and link to catalog --- .../aggregation/approximateunique.py | 57 +++++++++++++++++++ .../aggregation/approximateunique_test.py | 51 +++++++++++++++++ .../python/aggregation/approximateunique.md | 9 ++- 3 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py new file mode 100644 index 000000000000..02041df28316 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -0,0 +1,57 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: AppromximateUnique +# description: Demonstration of ApproximateUnique transform usage. +# multifile: false +# default_example: false +# context_line: 43 +# categories: +# complexity: BASIC +# tags: +# - transforms +# - integers + + +def approximateunique(test=None): + # [START approximateunique] + import apache_beam as beam + from apache_beam.transforms.stats import ApproximateUnique + import random + + with beam.Pipeline() as pipeline: + data = list(range(1000)) + random.shuffle(data) + + with beam.Pipeline() as pipeline: + result = ( + pipeline + | 'create' >> beam.Create(data) + | 'get_estimate' >> ApproximateUnique.Globally( + size=16) + | beam.Map(print) + ) + # [END approximateunique] + if test: + test(result) + + diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py new file mode 100644 index 000000000000..bb05542b164e --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import math +# pytype: skip-file + +import unittest +import apache_beam as beam + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to + +from . import approximateunique + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.approximateunique.print', + lambda x: x) +class ApproximateUniqueTest(unittest.TestCase): + def test_approximateunique(self): + def check_result(approx_count): + actual_count = 1000 + sample_size = 16 + error = 2 / math.sqrt(sample_size) + assert_that( + approx_count + | 'compare' >> beam.FlatMap( + lambda x: [abs(x - actual_count) * 1.0 / actual_count <= error]), + equal_to([True])) + + approximateunique.approximateunique(test=check_result) + + +if __name__ == '__main__': + unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md b/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md index dbdfb51e46f9..dae287198fe5 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md @@ -16,7 +16,14 @@ limitations under the License. --> # ApproximateUnique +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.stat" class="ApproximateUnique" >}} + ## Examples -See [Issue 19547](https://github.com/apache/beam/issues/19547) for updates. + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_ApproximateUnique" show="approximateunique" >}} +{{< /playground >}} ## Related transforms From 18bd106d2908537b70210f135001b2cc8fde7dce Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 09:52:03 -0400 Subject: [PATCH 07/31] fix approxquantiles reference --- .../transforms/python/aggregation/approximatequantiles.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md b/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md index 64c4455e7a3a..95f66490f5eb 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md @@ -24,7 +24,7 @@ limitations under the License. ## Examples {{< playground height="700px" >}} -{{< playground_snippet language="py" path="SDK_PYTHON_ApproximateQuantiles" show="groupintobatches" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_ApproximateQuantiles" show="approximatequantiles" >}} {{< /playground >}} ## Related transforms From 7376ac51bccf77e11d40cd3b89756b53eb2f834c Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 09:53:46 -0400 Subject: [PATCH 08/31] update approximateunique catalog entry --- .../site/content/en/documentation/transforms/python/overview.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index f52a9f4c9556..45814cada9ce 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -49,7 +49,7 @@ limitations under the License.
TransformDescription
ApproximateQuantilesNot available. See BEAM-6694 for updates.
ApproximateQuantilesGiven a distribution, find the approximate N-tiles.
ApproximateUniqueNot available. See BEAM-6693 for updates.
CoGroupByKeyTakes several keyed collections of elements and produces a collection where each element consists of a key and all values associated with that key.
CombineGloballyTransforms to combine elements.
- + From ffa6a839041d9bd8c0b64f77afd10eab6616865b Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 10:18:05 -0400 Subject: [PATCH 09/31] dd batchelements into catalog --- .../transforms/aggregation/batchelements.py | 64 +++++++++++++++++++ .../aggregation/batchelements_test.py | 51 +++++++++++++++ .../transforms/python/overview.md | 1 + 3 files changed, 116 insertions(+) create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements.py create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements.py new file mode 100644 index 000000000000..9e1bb7c6fea5 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements.py @@ -0,0 +1,64 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: BatchElements +# description: Demonstration of BatchElements transform usage. +# multifile: false +# default_example: false +# context_line: 39 +# categories: +# - Core Transforms +# complexity: BASIC +# tags: +# - transforms +# - strings +# - group + + +def batchelements(test=None): + # [START batchelements] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + batches = ( + pipeline + | 'Create produce' >> beam.Create([ + '🍓', + '🥕', + '🍆', + '🍅', + '🥕', + '🍅', + '🌽', + '🥕', + '🍅', + '🍆', + ]) + | beam.BatchElements(min_batch_size=3, max_batch_size=5) + | beam.Map(print)) + # [END batchelements] + if test: + test(batches) + + +if __name__ == '__main__': + batchelements() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py new file mode 100644 index 000000000000..2368f23acff1 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import equal_to, assert_that + +from . import batchelements + + +def check_batches(actual): + expected = [['🍓', '🥕', '🍆'], + ['🍅', '🥕', '🍅'], + ['🌽', '🥕', '🍅', '🍆']] + assert_that( + actual, equal_to(expected)) + + +def identity(x): + return x +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.batchelements.print', + identity) +# pylint: enable=line-too-long +class BatchElementsTest(unittest.TestCase): + def test_batchelements(self): + batchelements.batchelements(check_batches) + + +if __name__ == '__main__': + unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index 45814cada9ce..433b8d1829b9 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -50,6 +50,7 @@ limitations under the License. + From ecc0c983a597e8f7d29c8a16d6b0fa069a6d4334 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 10:19:10 -0400 Subject: [PATCH 10/31] Conform approx examples to referencing transforms directly on beam --- .../snippets/transforms/aggregation/approximatequantiles.py | 3 +-- .../snippets/transforms/aggregation/approximateunique.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py index 973d178a8118..0e05cf9661f1 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py @@ -35,12 +35,11 @@ def approximatequantiles(test=None): # [START quantiles] import apache_beam as beam - from apache_beam.transforms.stats import ApproximateQuantiles with beam.Pipeline() as pipeline: quantiles = (pipeline | 'Create data' >> beam.Create(list(range(1001))) - | 'Compute quantiles' >> ApproximateQuantiles.Globally(5) + | 'Compute quantiles' >> beam.ApproximateQuantiles.Globally(5) | beam.Map(print)) # [END approximatequantiles] if test: diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py index 02041df28316..c5723b536f3d 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -35,7 +35,6 @@ def approximateunique(test=None): # [START approximateunique] import apache_beam as beam - from apache_beam.transforms.stats import ApproximateUnique import random with beam.Pipeline() as pipeline: @@ -46,7 +45,7 @@ def approximateunique(test=None): result = ( pipeline | 'create' >> beam.Create(data) - | 'get_estimate' >> ApproximateUnique.Globally( + | 'get_estimate' >> beam.ApproximateUnique.Globally( size=16) | beam.Map(print) ) From fe13579354612d76693eb058c6be06dbc993ae8a Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 10:19:12 -0400 Subject: [PATCH 11/31] yapf --- .../aggregation/approximatequantiles.py | 15 ++++++++------- .../aggregation/approximatequantiles_test.py | 11 ++--------- .../transforms/aggregation/approximateunique.py | 8 ++------ .../aggregation/approximateunique_test.py | 9 +++++---- .../transforms/aggregation/batchelements_test.py | 11 +++++------ 5 files changed, 22 insertions(+), 32 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py index 0e05cf9661f1..a7701f21c10d 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py @@ -37,13 +37,14 @@ def approximatequantiles(test=None): import apache_beam as beam with beam.Pipeline() as pipeline: - quantiles = (pipeline - | 'Create data' >> beam.Create(list(range(1001))) - | 'Compute quantiles' >> beam.ApproximateQuantiles.Globally(5) - | beam.Map(print)) - # [END approximatequantiles] - if test: - test(quantiles) + quantiles = ( + pipeline + | 'Create data' >> beam.Create(list(range(1001))) + | 'Compute quantiles' >> beam.ApproximateQuantiles.Globally(5) + | beam.Map(print)) + # [END approximatequantiles] + if test: + test(quantiles) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py index 7be0b71193fe..51e99eb75030 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -27,6 +27,7 @@ from . import approximatequantiles + @mock.patch('apache_beam.Pipeline', TestPipeline) @mock.patch( 'apache_beam.examples.snippets.transforms.aggregation.approximatequantiles.print', @@ -34,15 +35,7 @@ class ApproximateQuantilesTest(unittest.TestCase): def test_approximatequantiles(self): def check_result(quantiles): - assert_that( - quantiles, - equal_to([[ - 0, - 250, - 500, - 750, - 1000 - ]])) + assert_that(quantiles, equal_to([[0, 250, 500, 750, 1000]])) approximatequantiles.approximatequantiles(test=check_result) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py index c5723b536f3d..f4ff2a0fc7f5 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -45,12 +45,8 @@ def approximateunique(test=None): result = ( pipeline | 'create' >> beam.Create(data) - | 'get_estimate' >> beam.ApproximateUnique.Globally( - size=16) - | beam.Map(print) - ) + | 'get_estimate' >> beam.ApproximateUnique.Globally(size=16) + | beam.Map(print)) # [END approximateunique] if test: test(result) - - diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index bb05542b164e..8eeadce3b224 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -28,6 +28,7 @@ from . import approximateunique + @mock.patch('apache_beam.Pipeline', TestPipeline) @mock.patch( 'apache_beam.examples.snippets.transforms.aggregation.approximateunique.print', @@ -39,10 +40,10 @@ def check_result(approx_count): sample_size = 16 error = 2 / math.sqrt(sample_size) assert_that( - approx_count - | 'compare' >> beam.FlatMap( - lambda x: [abs(x - actual_count) * 1.0 / actual_count <= error]), - equal_to([True])) + approx_count + | 'compare' >> beam.FlatMap( + lambda x: [abs(x - actual_count) * 1.0 / actual_count <= error]), + equal_to([True])) approximateunique.approximateunique(test=check_result) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py index 2368f23acff1..144936bc1512 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -27,15 +27,14 @@ def check_batches(actual): - expected = [['🍓', '🥕', '🍆'], - ['🍅', '🥕', '🍅'], - ['🌽', '🥕', '🍅', '🍆']] - assert_that( - actual, equal_to(expected)) + expected = [['🍓', '🥕', '🍆'], ['🍅', '🥕', '🍅'], ['🌽', '🥕', '🍅', '🍆']] + assert_that(actual, equal_to(expected)) def identity(x): - return x + return x + + @mock.patch('apache_beam.Pipeline', TestPipeline) # pylint: disable=line-too-long @mock.patch( From 3bc373aaaf30013dd4666d32bc1f34e2838f6af0 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 10:25:55 -0400 Subject: [PATCH 12/31] add batchelements example page --- .../python/aggregation/batchelements.md | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 website/www/site/content/en/documentation/transforms/python/aggregation/batchelements.md diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/batchelements.md b/website/www/site/content/en/documentation/transforms/python/aggregation/batchelements.md new file mode 100644 index 000000000000..c9d6a7ac6d07 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/batchelements.md @@ -0,0 +1,31 @@ +--- +title: "BatchElements" +--- + + +# BatchElements + +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.stat" class="BatchElements" >}} + +## Examples + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_BatchElements" show="batchelements" >}} +{{< /playground >}} + +## Related transforms +* [GroupIntoBatches](/documentation/transforms/python/aggregation/groupintobatches) batches elements by key From 83be45a0ca3901d5c517f1692f216c161e9b7f06 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 10:30:31 -0400 Subject: [PATCH 13/31] fix approximateunique extra pipeline --- .../aggregation/approximateunique.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py index f4ff2a0fc7f5..c0466214966b 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -40,13 +40,11 @@ def approximateunique(test=None): with beam.Pipeline() as pipeline: data = list(range(1000)) random.shuffle(data) - - with beam.Pipeline() as pipeline: - result = ( - pipeline - | 'create' >> beam.Create(data) - | 'get_estimate' >> beam.ApproximateUnique.Globally(size=16) - | beam.Map(print)) - # [END approximateunique] - if test: - test(result) + result = ( + pipeline + | 'create' >> beam.Create(data) + | 'get_estimate' >> beam.ApproximateUnique.Globally(size=16) + | beam.Map(print)) + # [END approximateunique] + if test: + test(result) From 340dbfcffddaeb2920ad069a12f2d021467c8e53 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 12:55:48 -0400 Subject: [PATCH 14/31] fix approximatequantiles --- .../snippets/transforms/aggregation/approximatequantiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py index a7701f21c10d..e95fdc84e88e 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py @@ -48,4 +48,4 @@ def approximatequantiles(test=None): if __name__ == '__main__': - cogroupbykey() + approximatequantiles() From 144c9813547a80849d8907449b9d285e554559c7 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 13:29:39 -0400 Subject: [PATCH 15/31] fix lint --- .../transforms/aggregation/approximatequantiles_test.py | 3 ++- .../snippets/transforms/aggregation/approximateunique_test.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py index 51e99eb75030..b52baf8c36da 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -30,7 +30,8 @@ @mock.patch('apache_beam.Pipeline', TestPipeline) @mock.patch( - 'apache_beam.examples.snippets.transforms.aggregation.approximatequantiles.print', + 'apache_beam.examples.snippets.transforms.aggregation.' + 'approximatequantiles.print', lambda x: x) class ApproximateQuantilesTest(unittest.TestCase): def test_approximatequantiles(self): diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index 8eeadce3b224..04af3341c0e6 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -31,7 +31,8 @@ @mock.patch('apache_beam.Pipeline', TestPipeline) @mock.patch( - 'apache_beam.examples.snippets.transforms.aggregation.approximateunique.print', + 'apache_beam.examples.snippets.transforms.aggregation.' + 'approximateunique.print', lambda x: x) class ApproximateUniqueTest(unittest.TestCase): def test_approximateunique(self): From e7fcf7010943874faf6bc503f1aa1ece0454adc4 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 14:08:15 -0400 Subject: [PATCH 16/31] fix lint --- .../transforms/aggregation/approximatequantiles_test.py | 3 ++- .../transforms/aggregation/approximateunique_test.py | 5 +++-- .../snippets/transforms/aggregation/batchelements_test.py | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py index b52baf8c36da..2adfcd05b99a 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -23,7 +23,8 @@ import mock from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from . import approximatequantiles diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index 04af3341c0e6..9ad019e93d64 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -19,12 +19,13 @@ # pytype: skip-file import unittest -import apache_beam as beam import mock +import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from . import approximateunique diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py index 144936bc1512..701f87c7c028 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -21,7 +21,8 @@ import mock from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import equal_to, assert_that +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from . import batchelements From 471a89ae9f2d1da21ba79142bdd72f4c01899f6b Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 14:15:48 -0400 Subject: [PATCH 17/31] fix typo in distinct example --- .../en/documentation/transforms/python/aggregation/distinct.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md b/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md index 24abe6bdd247..e6701d63baea 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md @@ -30,7 +30,7 @@ In the following example, we create a pipeline with two `PCollection`s of produc We use `Distinct` to get rid of duplicate elements, which outputs a `PCollection` of all the unique elements. {{< playground height="700px" >}} -{{< playground_snippet language="py" path="SDK_PYTHON_Distinct" show="distinc" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_Distinct" show="distinct" >}} {{< /playground >}} ## Related transforms From b2d7fdc1336ec45d61e435d5369171a160badb19 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 21:30:34 -0400 Subject: [PATCH 18/31] isort --- .../aggregation/approximatequantiles_test.py | 6 ++---- .../transforms/aggregation/approximateunique.py | 3 ++- .../aggregation/approximateunique_test.py | 14 ++++++++------ .../transforms/aggregation/batchelements_test.py | 6 ++---- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py index 2adfcd05b99a..a064da947562 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -18,13 +18,11 @@ # pytype: skip-file -import unittest - import mock +import unittest from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to +from apache_beam.testing.util import assert_that, equal_to from . import approximatequantiles diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py index c0466214966b..baf2918b2c16 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -34,9 +34,10 @@ def approximateunique(test=None): # [START approximateunique] - import apache_beam as beam import random + import apache_beam as beam + with beam.Pipeline() as pipeline: data = list(range(1000)) random.shuffle(data) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index 9ad019e93d64..56eaac37f1b7 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -16,19 +16,21 @@ # limitations under the License. # import math -# pytype: skip-file - -import unittest - import mock +import unittest import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to +from apache_beam.testing.util import assert_that, equal_to from . import approximateunique +# pytype: skip-file + + + + + @mock.patch('apache_beam.Pipeline', TestPipeline) @mock.patch( diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py index 701f87c7c028..8bce77dd21a0 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -16,13 +16,11 @@ # limitations under the License. # -import unittest - import mock +import unittest from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that -from apache_beam.testing.util import equal_to +from apache_beam.testing.util import assert_that, equal_to from . import batchelements From 0e8429d7301aff2d7ea82a05b1945cb9aa1d5832 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 21:42:13 -0400 Subject: [PATCH 19/31] yapf --- .../snippets/transforms/aggregation/approximateunique_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index 56eaac37f1b7..118e45bfaec1 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -28,10 +28,6 @@ # pytype: skip-file - - - - @mock.patch('apache_beam.Pipeline', TestPipeline) @mock.patch( 'apache_beam.examples.snippets.transforms.aggregation.' From 683d6703795f92a77ce7b34ab303b143cab109e4 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 21:52:32 -0400 Subject: [PATCH 20/31] add tolist to catalog --- .../snippets/transforms/aggregation/tolist.py | 56 +++++++++++++++++++ .../transforms/aggregation/tolist_test.py | 46 +++++++++++++++ .../transforms/python/aggregation/tolist.md | 28 ++++++++++ 3 files changed, 130 insertions(+) create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py create mode 100644 sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py create mode 100644 website/www/site/content/en/documentation/transforms/python/aggregation/tolist.md diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py new file mode 100644 index 000000000000..393e535e8397 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py @@ -0,0 +1,56 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: ToList +# description: Demonstration of ToList transform usage. +# multifile: false +# default_example: false +# context_line: 37 +# categories: +# - Core Transforms +# complexity: BASIC +# tags: +# - transforms + + +def tolist(test=None): + # [START tolist] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + listed_produce = ( + pipeline + | 'Create produce' >> beam.Create([ + '🍓', + '🥕', + '🍆', + '🍅' + ]) + | beam.combiners.ToList() + | beam.Map(print)) + # [END tolist] + if test: + test(listed_produce) + + +if __name__ == '__main__': + tolist() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py new file mode 100644 index 000000000000..bf5f52456abb --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -0,0 +1,46 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import mock +import unittest + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to + +from . import tolist + + +def identity(x): + return x + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.tolist.print', + identity) +# pylint: enable=line-too-long +class BatchElementsTest(unittest.TestCase): + def test_tolist(self): + def check(result): + assert_that(result, equal_to([['🍓', '🥕', '🍆', '🍅']])) + tolist.tolist(check) + + +if __name__ == '__main__': + unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/tolist.md b/website/www/site/content/en/documentation/transforms/python/aggregation/tolist.md new file mode 100644 index 000000000000..cfed99580796 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/tolist.md @@ -0,0 +1,28 @@ +--- +title: "BatchElements" +--- + + +# BatchElements + +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.combiners" class="ToList" >}} + +## Examples + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_ToList" show="tolist" >}} +{{< /playground >}} From fac00d3164165b4997a7a4ff216b5fbfe4f664d2 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 21:52:34 -0400 Subject: [PATCH 21/31] yapf --- .../examples/snippets/transforms/aggregation/tolist.py | 7 +------ .../snippets/transforms/aggregation/tolist_test.py | 1 + 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py index 393e535e8397..089b4bc21c76 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py @@ -39,12 +39,7 @@ def tolist(test=None): with beam.Pipeline() as pipeline: listed_produce = ( pipeline - | 'Create produce' >> beam.Create([ - '🍓', - '🥕', - '🍆', - '🍅' - ]) + | 'Create produce' >> beam.Create(['🍓', '🥕', '🍆', '🍅']) | beam.combiners.ToList() | beam.Map(print)) # [END tolist] diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py index bf5f52456abb..29eaee80fa4d 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -39,6 +39,7 @@ class BatchElementsTest(unittest.TestCase): def test_tolist(self): def check(result): assert_that(result, equal_to([['🍓', '🥕', '🍆', '🍅']])) + tolist.tolist(check) From 2bb19aa4c21fe03a9591c331a8336a1653132989 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Thu, 28 Mar 2024 21:53:54 -0400 Subject: [PATCH 22/31] update context lines --- .../snippets/transforms/aggregation/approximatequantiles.py | 2 +- .../snippets/transforms/aggregation/approximateunique.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py index e95fdc84e88e..e0f6b8e91408 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py @@ -24,7 +24,7 @@ # description: Demonstration of ApproximateQuantiles transform usage. # multifile: false # default_example: false -# context_line: 43 +# context_line: 37 # categories: # complexity: BASIC # tags: diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py index baf2918b2c16..0189064bcd4a 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -24,7 +24,7 @@ # description: Demonstration of ApproximateUnique transform usage. # multifile: false # default_example: false -# context_line: 43 +# context_line: 37 # categories: # complexity: BASIC # tags: @@ -35,7 +35,6 @@ def approximateunique(test=None): # [START approximateunique] import random - import apache_beam as beam with beam.Pipeline() as pipeline: From 8e66f3c0f2944d8b66c2aee7bbd91b140bd4d812 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 2 Apr 2024 23:13:04 -0400 Subject: [PATCH 23/31] run isort --- .../transforms/aggregation/approximatequantiles_test.py | 2 +- .../snippets/transforms/aggregation/approximateunique.py | 1 + .../snippets/transforms/aggregation/approximateunique_test.py | 2 +- .../snippets/transforms/aggregation/batchelements_test.py | 2 +- .../examples/snippets/transforms/aggregation/tolist_test.py | 2 +- 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py index a064da947562..4709c3627520 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -18,9 +18,9 @@ # pytype: skip-file -import mock import unittest +import mock from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py index 0189064bcd4a..e86f8e3eb563 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -35,6 +35,7 @@ def approximateunique(test=None): # [START approximateunique] import random + import apache_beam as beam with beam.Pipeline() as pipeline: diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index 118e45bfaec1..1835a4540d3c 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -16,10 +16,10 @@ # limitations under the License. # import math -import mock import unittest import apache_beam as beam +import mock from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py index 8bce77dd21a0..ab3ea2f8ec7a 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -16,9 +16,9 @@ # limitations under the License. # -import mock import unittest +import mock from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py index 29eaee80fa4d..678e9422033f 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -16,9 +16,9 @@ # limitations under the License. # -import mock import unittest +import mock from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to From c6acbf4c39a4d9f733d0b06b8344bc6042d25b41 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Tue, 2 Apr 2024 23:35:06 -0400 Subject: [PATCH 24/31] isort --- .../examples/snippets/transforms/aggregation/tolist_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py index 678e9422033f..29eaee80fa4d 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -16,9 +16,9 @@ # limitations under the License. # +import mock import unittest -import mock from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to From 5888aa447c32ba8ff6873fbf2a07f42d560e78d5 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 5 Apr 2024 15:20:20 -0400 Subject: [PATCH 25/31] rework asserts for batchelements_test.py --- .../aggregation/batchelements_test.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py index ab3ea2f8ec7a..f85e7172bb0e 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -19,6 +19,8 @@ import unittest import mock + +import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to @@ -26,8 +28,20 @@ def check_batches(actual): - expected = [['🍓', '🥕', '🍆'], ['🍅', '🥕', '🍅'], ['🌽', '🥕', '🍅', '🍆']] - assert_that(actual, equal_to(expected)) + # How the elements are grouped is not guaranteed, so we just + # check that all elements are lists and then count the elements + # to make sure none are lost. + all_elements_are_lists = (actual + | 'Check type' >> beam.Map(lambda x: isinstance(x, list)) + | 'All elements are lists' >> beam.CombineGlobally(all)) + assert_that(all_elements_are_lists, equal_to([True])) + + assert_that(actual + | beam.FlatMap(lambda x: x) + | 'Count' >> beam.combiners.Count.PerElement(), + equal_to([('🍓', 1), ('🥕', 3), ('🍆', 2), ('🍅', 3), ('🌽', 1)]), + label='Check counts') + def identity(x): @@ -37,8 +51,8 @@ def identity(x): @mock.patch('apache_beam.Pipeline', TestPipeline) # pylint: disable=line-too-long @mock.patch( - 'apache_beam.examples.snippets.transforms.aggregation.batchelements.print', - identity) + 'apache_beam.examples.snippets.transforms.aggregation.batchelements.print', + identity) # pylint: enable=line-too-long class BatchElementsTest(unittest.TestCase): def test_batchelements(self): From 7c0a9eb04b92830eaa7f074b05c87c888711b065 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 5 Apr 2024 15:24:42 -0400 Subject: [PATCH 26/31] update tolist test --- .../snippets/transforms/aggregation/tolist_test.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py index 29eaee80fa4d..b38816736df5 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -19,6 +19,7 @@ import mock import unittest +import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to @@ -38,7 +39,17 @@ def identity(x): class BatchElementsTest(unittest.TestCase): def test_tolist(self): def check(result): - assert_that(result, equal_to([['🍓', '🥕', '🍆', '🍅']])) + assert_that( + result + | 'Flatten' >> beam.FlatMap(identity) + | 'Count' >> beam.combiners.Count.PerElement(), + equal_to([('🍓', 1), ('🥕', 1), ('🍆', 1), ('🍅', 1)]), + label='Check counts') + assert_that( + result + | beam.Map(lambda x: isinstance(x, list)), + equal_to([True]), + label='Check type') tolist.tolist(check) From f01f08a386fe23d7029477a784b0b0b3039bb8e7 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 5 Apr 2024 15:26:48 -0400 Subject: [PATCH 27/31] add tolist to transform catalog table --- .../site/content/en/documentation/transforms/python/overview.md | 1 + 1 file changed, 1 insertion(+) diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index 433b8d1829b9..c5d9714e695e 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -66,6 +66,7 @@ limitations under the License. +
TransformDescription
ApproximateQuantilesGiven a distribution, find the approximate N-tiles.
ApproximateUniqueNot available. See BEAM-6693 for updates.
ApproximateUniqueGiven a pcollection, return the estimated number of unique elements.
CoGroupByKeyTakes several keyed collections of elements and produces a collection where each element consists of a key and all values associated with that key.
CombineGloballyTransforms to combine elements.
CombinePerKeyTransforms to combine elements for each key.
TransformDescription
ApproximateQuantilesGiven a distribution, find the approximate N-tiles.
ApproximateUniqueGiven a pcollection, return the estimated number of unique elements.
BatchElementsGiven a pcollection, return the estimated number of unique elements.
CoGroupByKeyTakes several keyed collections of elements and produces a collection where each element consists of a key and all values associated with that key.
CombineGloballyTransforms to combine elements.
CombinePerKeyTransforms to combine elements for each key.
MinGets the element with the minimum value within each aggregation.
SampleRandomly select some number of elements from each aggregation.
SumSums all the elements within each aggregation.
ToListAggregates all elements into a single list.
TopCompute the largest element(s) in each aggregation.
From b76916a669b738724757b015e0e6e6b0dfeafecd Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 5 Apr 2024 15:40:39 -0400 Subject: [PATCH 28/31] update navbar --- .../site/layouts/partials/section-menu/en/documentation.html | 3 +++ 1 file changed, 3 insertions(+) diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index ad6af867c1b2..1690372f9021 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -316,6 +316,8 @@ Aggregation
  • From aebcbc3544e1674c4e2f6adfbcef2f4ed8b891d5 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 5 Apr 2024 15:42:21 -0400 Subject: [PATCH 29/31] yapf --- .../aggregation/batchelements_test.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py index f85e7172bb0e..f77290043b97 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -31,17 +31,18 @@ def check_batches(actual): # How the elements are grouped is not guaranteed, so we just # check that all elements are lists and then count the elements # to make sure none are lost. - all_elements_are_lists = (actual - | 'Check type' >> beam.Map(lambda x: isinstance(x, list)) - | 'All elements are lists' >> beam.CombineGlobally(all)) + all_elements_are_lists = ( + actual + | 'Check type' >> beam.Map(lambda x: isinstance(x, list)) + | 'All elements are lists' >> beam.CombineGlobally(all)) assert_that(all_elements_are_lists, equal_to([True])) - assert_that(actual - | beam.FlatMap(lambda x: x) - | 'Count' >> beam.combiners.Count.PerElement(), - equal_to([('🍓', 1), ('🥕', 3), ('🍆', 2), ('🍅', 3), ('🌽', 1)]), - label='Check counts') - + assert_that( + actual + | beam.FlatMap(lambda x: x) + | 'Count' >> beam.combiners.Count.PerElement(), + equal_to([('🍓', 1), ('🥕', 3), ('🍆', 2), ('🍅', 3), ('🌽', 1)]), + label='Check counts') def identity(x): @@ -51,8 +52,8 @@ def identity(x): @mock.patch('apache_beam.Pipeline', TestPipeline) # pylint: disable=line-too-long @mock.patch( - 'apache_beam.examples.snippets.transforms.aggregation.batchelements.print', - identity) + 'apache_beam.examples.snippets.transforms.aggregation.batchelements.print', + identity) # pylint: enable=line-too-long class BatchElementsTest(unittest.TestCase): def test_batchelements(self): From 1390413956ffece2fa9f9ffe75ebb59ad072ca72 Mon Sep 17 00:00:00 2001 From: Joey Tran Date: Fri, 5 Apr 2024 15:42:55 -0400 Subject: [PATCH 30/31] isort --- .../transforms/aggregation/approximatequantiles_test.py | 1 + .../snippets/transforms/aggregation/approximateunique_test.py | 3 ++- .../examples/snippets/transforms/aggregation/tolist_test.py | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py index 4709c3627520..b52baf8c36da 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -21,6 +21,7 @@ import unittest import mock + from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index 1835a4540d3c..2fefc3462155 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -18,8 +18,9 @@ import math import unittest -import apache_beam as beam import mock + +import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py index b38816736df5..ae6d211fdab9 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -16,9 +16,10 @@ # limitations under the License. # -import mock import unittest +import mock + import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to From f8c7a63bb0cf7bddac56555454b5a0d4ef1f607e Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 16 Apr 2024 10:01:05 -0400 Subject: [PATCH 31/31] Lint fixes --- .../transforms/aggregation/approximatequantiles_test.py | 3 ++- .../snippets/transforms/aggregation/approximateunique_test.py | 3 ++- .../snippets/transforms/aggregation/batchelements_test.py | 3 ++- .../examples/snippets/transforms/aggregation/tolist_test.py | 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py index b52baf8c36da..2adfcd05b99a 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -23,7 +23,8 @@ import mock from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from . import approximatequantiles diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py index 2fefc3462155..c945cec534b8 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -22,7 +22,8 @@ import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from . import approximateunique diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py index f77290043b97..b370f8cd16be 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -22,7 +22,8 @@ import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from . import batchelements diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py index ae6d211fdab9..8f5235000c49 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -22,7 +22,8 @@ import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.testing.util import assert_that, equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from . import tolist