Skip to content

Commit

Permalink
Cleanup and fix combiners_test.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Jul 22, 2016
1 parent 583915d commit f18f4f0
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ def test_top(self):
assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp')

# Again for per-key combines.
pcoll = pipeline | Create(
'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
pcoll = pipeline | 'start-perkye' >> Create(
[('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
result_key_cmp = pcoll | combine.Top.PerKey(
'cmp-perkey',
result_key_cmp = pcoll | 'cmp-perkey' >> combine.Top.PerKey(
6,
lambda a, b, names: len(names[a]) < len(names[b]),
names) # Note parameter passed to comparator.
Expand All @@ -105,11 +104,11 @@ def test_top_shorthands(self):
assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')

pcoll = pipeline | Create(
'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
pcoll = pipeline | 'start-perkey' >> Create(
[('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5))
result_kbot = pcoll | beam.CombinePerKey(
'bot-perkey', combine.Smallest(4))
result_kbot = pcoll | 'bot-perkey' >> beam.CombinePerKey(
combine.Smallest(4))
assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
pipeline.run()
Expand Down Expand Up @@ -138,8 +137,7 @@ def match(actual):

# Now test per-key samples.
pipeline = Pipeline('DirectPipelineRunner')
pcoll = pipeline | Create(
'start-perkey',
pcoll = pipeline | 'start-perkey' >> Create(
sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)

Expand All @@ -158,7 +156,7 @@ def test_tuple_combine_fn(self):
p = Pipeline('DirectPipelineRunner')
result = (
p
| 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
| Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
| beam.CombineGlobally(combine.TupleCombineFn(max,
combine.MeanCombineFn(),
sum)).without_defaults())
Expand Down

0 comments on commit f18f4f0

Please sign in to comment.