Skip to content

Commit

Permalink
Update python transform catalog (#30788)
Browse files Browse the repository at this point in the history
* Remove unavailable transforms from catalog

* Implement approximatequantile example

* add context to groupbykey example

* add in live groupbykey sort in example

* add approximatequantiles example and update catalog

* Add approximateunique example and link to catalog

* fix approxquantiles reference

* update approximateunique catalog entry

* dd batchelements into catalog

* Conform approx examples to referencing transforms directly on beam

* yapf

* add batchelements example page

* fix approximateunique extra pipeline

* fix approximatequantiles

* fix lint

* fix lint

* fix typo in distinct example

* isort

* yapf

* add tolist to catalog

* yapf

* update context lines

* run isort

* isort

* rework asserts for batchelements_test.py

* update tolist test

* add tolist to transform catalog table

* update navbar

* yapf

* isort

* Lint fixes

---------

Co-authored-by: Danny McCormick <dannymccormick@google.com>
  • Loading branch information
hjtran and damccorm authored Apr 16, 2024
1 parent 995d1bf commit 277b6c3
Show file tree
Hide file tree
Showing 16 changed files with 529 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# - group
# - strings

# [START groupbykey]
import apache_beam as beam

with beam.Pipeline() as p:
Expand All @@ -35,3 +36,4 @@
| beam.Map(lambda word: (word[0], word))
| beam.GroupByKey()
| beam.LogElements())
# [END groupbykey]
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: AppromximateQuantiles
# description: Demonstration of ApproximateQuantiles transform usage.
# multifile: false
# default_example: false
# context_line: 37
# categories:
# complexity: BASIC
# tags:
# - transforms
# - integers


def approximatequantiles(test=None):
# [START quantiles]
import apache_beam as beam

with beam.Pipeline() as pipeline:
quantiles = (
pipeline
| 'Create data' >> beam.Create(list(range(1001)))
| 'Compute quantiles' >> beam.ApproximateQuantiles.Globally(5)
| beam.Map(print))
# [END approximatequantiles]
if test:
test(quantiles)


if __name__ == '__main__':
approximatequantiles()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

import unittest

import mock

from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

from . import approximatequantiles


@mock.patch('apache_beam.Pipeline', TestPipeline)
@mock.patch(
'apache_beam.examples.snippets.transforms.aggregation.'
'approximatequantiles.print',
lambda x: x)
class ApproximateQuantilesTest(unittest.TestCase):
def test_approximatequantiles(self):
def check_result(quantiles):
assert_that(quantiles, equal_to([[0, 250, 500, 750, 1000]]))

approximatequantiles.approximatequantiles(test=check_result)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: AppromximateUnique
# description: Demonstration of ApproximateUnique transform usage.
# multifile: false
# default_example: false
# context_line: 37
# categories:
# complexity: BASIC
# tags:
# - transforms
# - integers


def approximateunique(test=None):
# [START approximateunique]
import random

import apache_beam as beam

with beam.Pipeline() as pipeline:
data = list(range(1000))
random.shuffle(data)
result = (
pipeline
| 'create' >> beam.Create(data)
| 'get_estimate' >> beam.ApproximateUnique.Globally(size=16)
| beam.Map(print))
# [END approximateunique]
if test:
test(result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math
import unittest

import mock

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

from . import approximateunique

# pytype: skip-file


@mock.patch('apache_beam.Pipeline', TestPipeline)
@mock.patch(
'apache_beam.examples.snippets.transforms.aggregation.'
'approximateunique.print',
lambda x: x)
class ApproximateUniqueTest(unittest.TestCase):
def test_approximateunique(self):
def check_result(approx_count):
actual_count = 1000
sample_size = 16
error = 2 / math.sqrt(sample_size)
assert_that(
approx_count
| 'compare' >> beam.FlatMap(
lambda x: [abs(x - actual_count) * 1.0 / actual_count <= error]),
equal_to([True]))

approximateunique.approximateunique(test=check_result)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: BatchElements
# description: Demonstration of BatchElements transform usage.
# multifile: false
# default_example: false
# context_line: 39
# categories:
# - Core Transforms
# complexity: BASIC
# tags:
# - transforms
# - strings
# - group


def batchelements(test=None):
# [START batchelements]
import apache_beam as beam

with beam.Pipeline() as pipeline:
batches = (
pipeline
| 'Create produce' >> beam.Create([
'🍓',
'🥕',
'🍆',
'🍅',
'🥕',
'🍅',
'🌽',
'🥕',
'🍅',
'🍆',
])
| beam.BatchElements(min_batch_size=3, max_batch_size=5)
| beam.Map(print))
# [END batchelements]
if test:
test(batches)


if __name__ == '__main__':
batchelements()
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

import mock

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

from . import batchelements


def check_batches(actual):
# How the elements are grouped is not guaranteed, so we just
# check that all elements are lists and then count the elements
# to make sure none are lost.
all_elements_are_lists = (
actual
| 'Check type' >> beam.Map(lambda x: isinstance(x, list))
| 'All elements are lists' >> beam.CombineGlobally(all))
assert_that(all_elements_are_lists, equal_to([True]))

assert_that(
actual
| beam.FlatMap(lambda x: x)
| 'Count' >> beam.combiners.Count.PerElement(),
equal_to([('🍓', 1), ('🥕', 3), ('🍆', 2), ('🍅', 3), ('🌽', 1)]),
label='Check counts')


def identity(x):
return x


@mock.patch('apache_beam.Pipeline', TestPipeline)
# pylint: disable=line-too-long
@mock.patch(
'apache_beam.examples.snippets.transforms.aggregation.batchelements.print',
identity)
# pylint: enable=line-too-long
class BatchElementsTest(unittest.TestCase):
def test_batchelements(self):
batchelements.batchelements(check_batches)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 277b6c3

Please sign in to comment.