Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update python transform catalog #30788

Merged
merged 32 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
82f3e23
Remove unavailable transforms from catalog
hjtran Mar 28, 2024
b6a4503
Implement approximatequantile example
hjtran Mar 28, 2024
f0a75ba
add context to groupbykey example
hjtran Mar 28, 2024
b8fde1f
add in live groupbykey sort in example
hjtran Mar 28, 2024
7736c4d
add approximatequantiles example and update catalog
hjtran Mar 28, 2024
25c3812
Add approximateunique example and link to catalog
hjtran Mar 28, 2024
18bd106
fix approxquantiles reference
hjtran Mar 28, 2024
7376ac5
update approximateunique catalog entry
hjtran Mar 28, 2024
ffa6a83
dd batchelements into catalog
hjtran Mar 28, 2024
ecc0c98
Conform approx examples to referencing transforms directly on beam
hjtran Mar 28, 2024
fe13579
yapf
hjtran Mar 28, 2024
3bc373a
add batchelements example page
hjtran Mar 28, 2024
83be45a
fix approximateunique extra pipeline
hjtran Mar 28, 2024
340dbfc
fix approximatequantiles
hjtran Mar 28, 2024
144c981
fix lint
hjtran Mar 28, 2024
e7fcf70
fix lint
hjtran Mar 28, 2024
471a89a
fix typo in distinct example
hjtran Mar 28, 2024
b2d7fdc
isort
hjtran Mar 29, 2024
0e8429d
yapf
hjtran Mar 29, 2024
683d670
add tolist to catalog
hjtran Mar 29, 2024
fac00d3
yapf
hjtran Mar 29, 2024
2bb19aa
update context lines
hjtran Mar 29, 2024
8e66f3c
run isort
hjtran Apr 3, 2024
c6acbf4
isort
hjtran Apr 3, 2024
5888aa4
rework asserts for batchelements_test.py
hjtran Apr 5, 2024
7c0a9eb
update tolist test
hjtran Apr 5, 2024
f01f08a
add tolist to transform catalog table
hjtran Apr 5, 2024
b76916a
update navbar
hjtran Apr 5, 2024
12522ae
Merge branch 'master' into updateexamples
hjtran Apr 5, 2024
aebcbc3
yapf
hjtran Apr 5, 2024
1390413
isort
hjtran Apr 5, 2024
f8c7a63
Lint fixes
damccorm Apr 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# - group
# - strings

# [START groupbykey]
import apache_beam as beam

with beam.Pipeline() as p:
Expand All @@ -35,3 +36,4 @@
| beam.Map(lambda word: (word[0], word))
| beam.GroupByKey()
| beam.LogElements())
# [END groupbykey]
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: AppromximateQuantiles
# description: Demonstration of ApproximateQuantiles transform usage.
# multifile: false
# default_example: false
# context_line: 37
# categories:
# complexity: BASIC
# tags:
# - transforms
# - integers


def approximatequantiles(test=None):
# [START quantiles]
import apache_beam as beam

with beam.Pipeline() as pipeline:
quantiles = (
pipeline
| 'Create data' >> beam.Create(list(range(1001)))
| 'Compute quantiles' >> beam.ApproximateQuantiles.Globally(5)
| beam.Map(print))
# [END approximatequantiles]
if test:
test(quantiles)


if __name__ == '__main__':
approximatequantiles()
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file

import unittest

import mock

from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
damccorm marked this conversation as resolved.
Show resolved Hide resolved

from . import approximatequantiles


@mock.patch('apache_beam.Pipeline', TestPipeline)
@mock.patch(
'apache_beam.examples.snippets.transforms.aggregation.'
'approximatequantiles.print',
lambda x: x)
class ApproximateQuantilesTest(unittest.TestCase):
def test_approximatequantiles(self):
def check_result(quantiles):
assert_that(quantiles, equal_to([[0, 250, 500, 750, 1000]]))

approximatequantiles.approximatequantiles(test=check_result)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: AppromximateUnique
# description: Demonstration of ApproximateUnique transform usage.
# multifile: false
# default_example: false
# context_line: 37
# categories:
# complexity: BASIC
# tags:
# - transforms
# - integers


def approximateunique(test=None):
# [START approximateunique]
import random

import apache_beam as beam

with beam.Pipeline() as pipeline:
data = list(range(1000))
random.shuffle(data)
result = (
pipeline
| 'create' >> beam.Create(data)
| 'get_estimate' >> beam.ApproximateUnique.Globally(size=16)
| beam.Map(print))
# [END approximateunique]
if test:
test(result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math
import unittest

import mock

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
damccorm marked this conversation as resolved.
Show resolved Hide resolved

from . import approximateunique

# pytype: skip-file


@mock.patch('apache_beam.Pipeline', TestPipeline)
@mock.patch(
'apache_beam.examples.snippets.transforms.aggregation.'
'approximateunique.print',
lambda x: x)
class ApproximateUniqueTest(unittest.TestCase):
def test_approximateunique(self):
def check_result(approx_count):
actual_count = 1000
sample_size = 16
error = 2 / math.sqrt(sample_size)
assert_that(
approx_count
| 'compare' >> beam.FlatMap(
lambda x: [abs(x - actual_count) * 1.0 / actual_count <= error]),
equal_to([True]))

approximateunique.approximateunique(test=check_result)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: BatchElements
# description: Demonstration of BatchElements transform usage.
# multifile: false
# default_example: false
# context_line: 39
# categories:
# - Core Transforms
# complexity: BASIC
# tags:
# - transforms
# - strings
# - group


def batchelements(test=None):
# [START batchelements]
import apache_beam as beam

with beam.Pipeline() as pipeline:
batches = (
pipeline
| 'Create produce' >> beam.Create([
'🍓',
'🥕',
'🍆',
'🍅',
'🥕',
'🍅',
'🌽',
'🥕',
'🍅',
'🍆',
])
| beam.BatchElements(min_batch_size=3, max_batch_size=5)
| beam.Map(print))
# [END batchelements]
if test:
test(batches)


if __name__ == '__main__':
batchelements()
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest

import mock

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
damccorm marked this conversation as resolved.
Show resolved Hide resolved

from . import batchelements


def check_batches(actual):
# How the elements are grouped is not guaranteed, so we just
# check that all elements are lists and then count the elements
# to make sure none are lost.
all_elements_are_lists = (
actual
| 'Check type' >> beam.Map(lambda x: isinstance(x, list))
| 'All elements are lists' >> beam.CombineGlobally(all))
assert_that(all_elements_are_lists, equal_to([True]))

assert_that(
actual
| beam.FlatMap(lambda x: x)
| 'Count' >> beam.combiners.Count.PerElement(),
equal_to([('🍓', 1), ('🥕', 3), ('🍆', 2), ('🍅', 3), ('🌽', 1)]),
label='Check counts')


def identity(x):
return x


@mock.patch('apache_beam.Pipeline', TestPipeline)
# pylint: disable=line-too-long
@mock.patch(
'apache_beam.examples.snippets.transforms.aggregation.batchelements.print',
identity)
# pylint: enable=line-too-long
class BatchElementsTest(unittest.TestCase):
def test_batchelements(self):
batchelements.batchelements(check_batches)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint:disable=line-too-long

# beam-playground:
# name: ToList
# description: Demonstration of ToList transform usage.
# multifile: false
# default_example: false
# context_line: 37
# categories:
# - Core Transforms
# complexity: BASIC
# tags:
# - transforms


def tolist(test=None):
# [START tolist]
import apache_beam as beam

with beam.Pipeline() as pipeline:
listed_produce = (
pipeline
| 'Create produce' >> beam.Create(['🍓', '🥕', '🍆', '🍅'])
| beam.combiners.ToList()
| beam.Map(print))
# [END tolist]
if test:
test(listed_produce)


if __name__ == '__main__':
tolist()
Loading
Loading