diff --git a/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py b/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py index 7e5d3fb954f9..2832de39cad4 100644 --- a/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py +++ b/learning/katas/python/Core Transforms/GroupByKey/GroupByKey/task.py @@ -27,6 +27,7 @@ # - group # - strings +# [START groupbykey] import apache_beam as beam with beam.Pipeline() as p: @@ -35,3 +36,4 @@ | beam.Map(lambda word: (word[0], word)) | beam.GroupByKey() | beam.LogElements()) +# [END groupbykey] diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py new file mode 100644 index 000000000000..e0f6b8e91408 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: AppromximateQuantiles +# description: Demonstration of ApproximateQuantiles transform usage. +# multifile: false +# default_example: false +# context_line: 37 +# categories: +# complexity: BASIC +# tags: +# - transforms +# - integers + + +def approximatequantiles(test=None): + # [START quantiles] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + quantiles = ( + pipeline + | 'Create data' >> beam.Create(list(range(1001))) + | 'Compute quantiles' >> beam.ApproximateQuantiles.Globally(5) + | beam.Map(print)) + # [END approximatequantiles] + if test: + test(quantiles) + + +if __name__ == '__main__': + approximatequantiles() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py new file mode 100644 index 000000000000..2adfcd05b99a --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximatequantiles_test.py @@ -0,0 +1,46 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import unittest + +import mock + +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import approximatequantiles + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.' + 'approximatequantiles.print', + lambda x: x) +class ApproximateQuantilesTest(unittest.TestCase): + def test_approximatequantiles(self): + def check_result(quantiles): + assert_that(quantiles, equal_to([[0, 250, 500, 750, 1000]])) + + approximatequantiles.approximatequantiles(test=check_result) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py new file mode 100644 index 000000000000..e86f8e3eb563 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: AppromximateUnique +# description: Demonstration of ApproximateUnique transform usage. +# multifile: false +# default_example: false +# context_line: 37 +# categories: +# complexity: BASIC +# tags: +# - transforms +# - integers + + +def approximateunique(test=None): + # [START approximateunique] + import random + + import apache_beam as beam + + with beam.Pipeline() as pipeline: + data = list(range(1000)) + random.shuffle(data) + result = ( + pipeline + | 'create' >> beam.Create(data) + | 'get_estimate' >> beam.ApproximateUnique.Globally(size=16) + | beam.Map(print)) + # [END approximateunique] + if test: + test(result) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py new file mode 100644 index 000000000000..c945cec534b8 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/approximateunique_test.py @@ -0,0 +1,54 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import math +import unittest + +import mock + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import approximateunique + +# pytype: skip-file + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.' + 'approximateunique.print', + lambda x: x) +class ApproximateUniqueTest(unittest.TestCase): + def test_approximateunique(self): + def check_result(approx_count): + actual_count = 1000 + sample_size = 16 + error = 2 / math.sqrt(sample_size) + assert_that( + approx_count + | 'compare' >> beam.FlatMap( + lambda x: [abs(x - actual_count) * 1.0 / actual_count <= error]), + equal_to([True])) + + approximateunique.approximateunique(test=check_result) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements.py new file mode 100644 index 000000000000..9e1bb7c6fea5 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements.py @@ -0,0 +1,64 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: BatchElements +# description: Demonstration of BatchElements transform usage. +# multifile: false +# default_example: false +# context_line: 39 +# categories: +# - Core Transforms +# complexity: BASIC +# tags: +# - transforms +# - strings +# - group + + +def batchelements(test=None): + # [START batchelements] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + batches = ( + pipeline + | 'Create produce' >> beam.Create([ + '🍓', + '🥕', + '🍆', + '🍅', + '🥕', + '🍅', + '🌽', + '🥕', + '🍅', + '🍆', + ]) + | beam.BatchElements(min_batch_size=3, max_batch_size=5) + | beam.Map(print)) + # [END batchelements] + if test: + test(batches) + + +if __name__ == '__main__': + batchelements() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py new file mode 100644 index 000000000000..b370f8cd16be --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/batchelements_test.py @@ -0,0 +1,65 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +import mock + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import batchelements + + +def check_batches(actual): + # How the elements are grouped is not guaranteed, so we just + # check that all elements are lists and then count the elements + # to make sure none are lost. + all_elements_are_lists = ( + actual + | 'Check type' >> beam.Map(lambda x: isinstance(x, list)) + | 'All elements are lists' >> beam.CombineGlobally(all)) + assert_that(all_elements_are_lists, equal_to([True])) + + assert_that( + actual + | beam.FlatMap(lambda x: x) + | 'Count' >> beam.combiners.Count.PerElement(), + equal_to([('🍓', 1), ('🥕', 3), ('🍆', 2), ('🍅', 3), ('🌽', 1)]), + label='Check counts') + + +def identity(x): + return x + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.batchelements.print', + identity) +# pylint: enable=line-too-long +class BatchElementsTest(unittest.TestCase): + def test_batchelements(self): + batchelements.batchelements(check_batches) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py new file mode 100644 index 000000000000..089b4bc21c76 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist.py @@ -0,0 +1,51 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file +# pylint:disable=line-too-long + +# beam-playground: +# name: ToList +# description: Demonstration of ToList transform usage. +# multifile: false +# default_example: false +# context_line: 37 +# categories: +# - Core Transforms +# complexity: BASIC +# tags: +# - transforms + + +def tolist(test=None): + # [START tolist] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + listed_produce = ( + pipeline + | 'Create produce' >> beam.Create(['🍓', '🥕', '🍆', '🍅']) + | beam.combiners.ToList() + | beam.Map(print)) + # [END tolist] + if test: + test(listed_produce) + + +if __name__ == '__main__': + tolist() diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py new file mode 100644 index 000000000000..8f5235000c49 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/tolist_test.py @@ -0,0 +1,60 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +import mock + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +from . import tolist + + +def identity(x): + return x + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +# pylint: disable=line-too-long +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.tolist.print', + identity) +# pylint: enable=line-too-long +class BatchElementsTest(unittest.TestCase): + def test_tolist(self): + def check(result): + assert_that( + result + | 'Flatten' >> beam.FlatMap(identity) + | 'Count' >> beam.combiners.Count.PerElement(), + equal_to([('🍓', 1), ('🥕', 1), ('🍆', 1), ('🍅', 1)]), + label='Check counts') + assert_that( + result + | beam.Map(lambda x: isinstance(x, list)), + equal_to([True]), + label='Check type') + + tolist.tolist(check) + + +if __name__ == '__main__': + unittest.main() diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md b/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md index adb6d09e768a..95f66490f5eb 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/approximatequantiles.md @@ -17,7 +17,14 @@ limitations under the License. # ApproximateQuantiles +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.stat" class="ApproximateQuantile" >}} + ## Examples -See [Issue 19547](https://github.com/apache/beam/issues/19547) for updates. + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_ApproximateQuantiles" show="approximatequantiles" >}} +{{< /playground >}} ## Related transforms diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md b/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md index dbdfb51e46f9..dae287198fe5 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/approximateunique.md @@ -16,7 +16,14 @@ limitations under the License. --> # ApproximateUnique +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.stat" class="ApproximateUnique" >}} + ## Examples -See [Issue 19547](https://github.com/apache/beam/issues/19547) for updates. + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_ApproximateUnique" show="approximateunique" >}} +{{< /playground >}} ## Related transforms diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/batchelements.md b/website/www/site/content/en/documentation/transforms/python/aggregation/batchelements.md new file mode 100644 index 000000000000..c9d6a7ac6d07 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/batchelements.md @@ -0,0 +1,31 @@ +--- +title: "BatchElements" +--- + + +# BatchElements + +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.stat" class="BatchElements" >}} + +## Examples + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_BatchElements" show="batchelements" >}} +{{< /playground >}} + +## Related transforms +* [GroupIntoBatches](/documentation/transforms/python/aggregation/groupintobatches) batches elements by key diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md b/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md index 24abe6bdd247..e6701d63baea 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/distinct.md @@ -30,7 +30,7 @@ In the following example, we create a pipeline with two `PCollection`s of produc We use `Distinct` to get rid of duplicate elements, which outputs a `PCollection` of all the unique elements. {{< playground height="700px" >}} -{{< playground_snippet language="py" path="SDK_PYTHON_Distinct" show="distinc" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_Distinct" show="distinct" >}} {{< /playground >}} ## Related transforms diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md b/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md index 5ba04bc2fe1a..e3f287da7d04 100644 --- a/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/groupbykey.md @@ -32,17 +32,9 @@ See more information in the [Beam Programming Guide](/documentation/programming- We use `GroupByKey` to group all the produce for each season. -{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py" >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py" groupbykey >}} -{{< /highlight >}} - -{{< paragraph class="notebook-skip" >}} -Output: -{{< /paragraph >}} - -{{< highlight class="notebook-skip" >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py" produce_counts >}} -{{< /highlight >}} +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_GroupByKey" show="groupbykeysort" >}} +{{< /playground >}} **Example 2**: diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/tolist.md b/website/www/site/content/en/documentation/transforms/python/aggregation/tolist.md new file mode 100644 index 000000000000..cfed99580796 --- /dev/null +++ b/website/www/site/content/en/documentation/transforms/python/aggregation/tolist.md @@ -0,0 +1,28 @@ +--- +title: "BatchElements" +--- + + +# BatchElements + +{{< localstorage language language-py >}} + +{{< button-pydoc path="apache_beam.transforms.combiners" class="ToList" >}} + +## Examples + +{{< playground height="700px" >}} +{{< playground_snippet language="py" path="SDK_PYTHON_ToList" show="tolist" >}} +{{< /playground >}} diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md index a8982fa9798e..c5d9714e695e 100644 --- a/website/www/site/content/en/documentation/transforms/python/overview.md +++ b/website/www/site/content/en/documentation/transforms/python/overview.md @@ -48,13 +48,13 @@ limitations under the License.
Transform | Description |
---|---|
ApproximateQuantiles | Not available. See BEAM-6694 for updates. |
ApproximateUnique | Not available. See BEAM-6693 for updates. |
ApproximateQuantiles | Given a distribution, find the approximate N-tiles. |
ApproximateUnique | Given a pcollection, return the estimated number of unique elements. |
BatchElements | Given a pcollection, return the estimated number of unique elements. |
CoGroupByKey | Takes several keyed collections of elements and produces a collection where each element consists of a key and all values associated with that key. |
CombineGlobally | Transforms to combine elements. |
CombinePerKey | Transforms to combine elements for each key. |
CombineValues | Transforms to combine keyed iterables. |
CombineWithContext | Not available. |
Count | Counts the number of elements within each aggregation. |
Distinct | Produces a collection containing distinct elements from the input collection. |
GroupByKey | Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. |
Min | Gets the element with the minimum value within each aggregation. |
Sample | Randomly select some number of elements from each aggregation. |
Sum | Sums all the elements within each aggregation. |
ToList | Aggregates all elements into a single list. |
Top | Compute the largest element(s) in each aggregation. |