-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unit Testing in Beam Blog Post #31701
Conversation
R: @liferoad |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
|
||
|
||
|
||
The following cover other testing best practices: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we mention a lot of testing utils under https://github.com/apache/beam/tree/master/sdks/python/apache_beam/testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these useful for customers writing pipeline? IIUC these utils are designed more for tests written in the Beam repo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users should generally use assert_that, equal_to, and (for streaming) test stream.
@@ -0,0 +1,149 @@ | |||
--- | |||
title: "So You Want to Write Tests on Your Beam Pipeline?" | |||
date: 2024-07-08 00:00:01 -0800 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note - this isn't rendering in https://apache-beam-website-pull-requests.storage.googleapis.com/31701/blog/index.html
I suspect it is because the date is in the future, but I'm not sure. If so, that's a neat way to gate blog posts for a given time I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I also don't see it rendered at https://apache-beam-website-pull-requests.storage.googleapis.com/31701/blog/unit-testing-blog/index.html
Maybe something is malformatted? If you stage it locally with an older date, does it render?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If needed, lets temporarily set this to an older date so we can review the staged version
numbers=[1,2,3] | ||
|
||
|
||
with TestPipeline() as p: | ||
output = p | beam.Create([1,2,3]) | ||
| beam.Map(compute_square) | ||
assert_that(output, equal_to([1,4,9])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is poorly formatted (indentation needed, too much white space). I'd also recommend defining examples=[1,2,3]
and expected=[1,4,9]
and using those in the pipeline rather than explicit lists
with TestPipeline() as p: | ||
output = p | beam.Create(strings) | ||
| beam.Map(str.strip) | ||
assert_that(output,['Strawberry','Carrot','Eggplant']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - formatting seems off, + lets make an expected=['Strawberry','Carrot','Eggplant']
instead of an explicit list here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we're missing an equal_to
. Lets make sure we quickly run these test snippets locally before submitting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented on this above, but let's ensure they're continuously run.
class TestBeam(unittest.TestCase): | ||
def test_custom_function(self): | ||
with TestPipeline() as p: | ||
input = p | beam.ParDo(custom_function(("1","2","3")) | ||
assert_that(input, equal_to(["1","2","3"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this example is confusing - is the custom_function
just the identity function? We have examples of assert_that
above, can we use that instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works, but I'm also trying to understand what it's even trying to do. You can't apply a ParDo to a raw pipeline object. ParDo accepts a DoFn, is custom_function(some_tuple)
returning a DoFn that yields the tuple?
Again, if we actually ran this code we'd uncover these issues.
|
||
|
||
|
||
For more pointed guidance on testing on Beam/Dataflow, see the [Google Cloud documentation](https://cloud.google.com/dataflow/docs/guides/develop-and-test-pipelines). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also point to some example tests here as well? The beam repo is probably the best place for example PTransform tests. For example, we have quite extensive RunInference tests -
class RunInferenceBaseTest(unittest.TestCase): |
Thanks for the feedback. To address the code running as intended, I plan to create a colab notebook with the runnable snippets, and link it to the blog post (as well as check it in to the Beam repo). Will create after the holiday. |
SGTM - at that point we may be able to ignore the date related comments anyways and just validate that it does indeed render the blog |
I've added a colab notebook and tested these examples locally. PTAL, once we have a consensus on the notebook examples, I will update the blog post and address the other comments. |
" result = (\n", | ||
" p2\n", | ||
" | ReadFromText(\"/content/sample_data/anscombe.json\")\n", | ||
" | beam.Map(str.strip)\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if a better way to do this would be to split beam.Map(str.strip)
out into a separate function which can be called from the test. As it is, the test isn't actually invoking any of the code we've written.
A more interesting example might be:
def manipulate_strings(incoming_pcoll):
return incoming_pcoll | beam.Map(str.strip) | beam.Map(str.upper)
The functions themselves don't need tested, but the beam transforms do. That would let you test the actual code you've written below with:
with TestPipeline() as p:
inputs = p | beam.Create(strings)
output = manipulate_strings(inputs)
assert_that(output, equal_to(expected))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, I don't like that you could totally change the user code (which you're supposed to be testing) without a test failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing output = manipulate_strings(inputs)
makes me wonder how we could make composite PTransforms even easier/more natural.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, I don't like that you could totally change the user code (which you're supposed to be testing) without a test failing.
I see your point and think it's valid (I can make the change). Out of curiosity though, what if a users entire transform was an inbuilt function (like str.strip
)? Would the guidance be that they wouldn't need to test the Beam transform?
Co-authored-by: Danny McCormick <dannymccormick@google.com>
"# The following packages are imported for unit testing.\n", | ||
"import unittest\n", | ||
"import apache_beam as beam\n", | ||
"from apache_beam.testing.test_pipeline import TestPipeline\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is three any advantage to users of using TestPipeline
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT, no distinct advantages outside of the reasons mentioned here, as well as it being the de-facto choice for tests from previous documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for writing this up, guidance on good unit testing practices for Beam is much needed.
" HttpError = None\n", | ||
"\n", | ||
"\n", | ||
"@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would seems one of the main points of unit testing is to not have heavyweight dependencies. I wouldn't say skipping like this is best practices unless absolutely necessary.
"class TestBeam(unittest.TestCase):\n", | ||
"\n", | ||
"# This test corresponds to pipeline p1, and is written to confirm the compute_square function works as intended.\n", | ||
" def test_compute_square(self):\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If compute_square
is an ordinary Python function, I would recommend writing "ordinary" unit tests for it rather than testing it as part of a pipeline.
"cell_type": "code", | ||
"source": [ | ||
"# We import the mock package for mocking functionality.\n", | ||
"import mock\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mocking often interacts poorly with serialization; I would avoid this when possible. (Also, are these examples automatically tested?
" with self.assertRaisesRegex(ValueError,\n", | ||
" \"Length of record does not match expected length'\"):\n", | ||
" p = beam.Pipeline()\n", | ||
" result = p | beam.ParDo(CustomClass.custom_function())\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CustomClass.custom_function()
returns a DoFn
? I'm a bit confused at what you're trying to test here.
|
||
with beam.Pipeline(argv=self.args) as p: | ||
result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv") | ||
| beam.ParDo(lambda x: custom_function(x)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You cand ParDo(lambda)
. I would look into how snippets are used in the programming guide to ensure the code is (and remains) correct.
class TestBeam(unittest.TestCase): | ||
def test_custom_function(self): | ||
with TestPipeline() as p: | ||
input = p | beam.ParDo(custom_function(("1","2","3")) | ||
assert_that(input, equal_to(["1","2","3"])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works, but I'm also trying to understand what it's even trying to do. You can't apply a ParDo to a raw pipeline object. ParDo accepts a DoFn, is custom_function(some_tuple)
returning a DoFn that yields the tuple?
Again, if we actually ran this code we'd uncover these issues.
class TestBeam(unittest.TestCase): | ||
def test_custom_function(self): | ||
with TestPipeline() as p: | ||
input = p | beam.ParDo(custom_function(("1","2","3")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: If we want ints, lets use ints, if we want strings, let's use strings (like "a", "b", "c"
or fruit names or whatever, not numeric strings).
|
||
#The following packages are used to run the example pipelines | ||
import apache_beam as beam | ||
import apache_beam.io.textio.ReadFromText |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java-style imports?
import apache_beam.io.textio.WriteToText | ||
|
||
with beam.Pipeline(argv=self.args) as p: | ||
result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, the problem is one can't really test this pipeline without reproducing it in the test. I think there are a couple of ways of re-structuring the code to make it more testable (and realistic).
(1) For an end-to-end test, put your pipeline in a function that takes the input and output paths as parameters. Your production code will call this with, e.g. gcs paths, but your test could create temporary directories and files and validate that. This tests your whole pipeline, including IOs, is structured correct.
(2) Factor out the "processing" part of your pipeline into its own PTransform, or at least function. E.g. your pipeline would be
with beam.Pipeline(argv=self.args) as p:
_ = (p
| beam.io.ReadFromText("gs://my-storage-bucket/csv_location.csv")
| ProcessData(...)
| beam.io.WriteToText()
and then your unit test would look like
with beam.Pipeline(argv=self.args) as p:
_ = (p
| beam.Create(["some", "sample", "elements"])
| ProcessData(...)
| AssertEqualTo(["expected", "outputs"]))
or (equivalently, but less parallel)
with beam.Pipeline(argv=self.args) as p:
output_pcoll = (p
| beam.Create(["some", "sample", "elements"])
| ProcessData(...))
assert_that(output_pcoll, equal_to(...))
If writing a custom ProcessData PTransform is too much work, one could at least have
output_pcoll = process_data(input_pcoll)
@mock.patch.object(CustomFunction, 'get_record') | ||
def test_error_message_wrong_length(self, get_record): | ||
record = ["field1","field2",...] | ||
get_record.return_value = record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not following this at all. Where is get_record being called? Why is it being mocked?
Hi @robertwb thanks for taking a look! The blog and the colab notebook are actually currently out of sync so that we could test runnable examples. Once aligned on those, I was going to update the blog. Anyhow, I will take a look at the comments and see where they apply. |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
This blog posts details opinionated examples and practices for unit testing in Beam. Examples use the Python SDK.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.