Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
svetakvsundhar committed Jun 28, 2024
1 parent 4684ecf commit 736cf50
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions website/www/site/content/en/blog/unit-testing-blog.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,22 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
## So You Want to Write Tests on your Beam pipeline?
Testing remains one of the most fundamental components of software engineering. In this blog post, we shed light on some of the constructs that Apache Beam provides to allow for testing. We cover an opinionated set of best practices to write unit tests for your data pipeline in this post. Note that this post does not include integration tests, and those should be authored separately.
Testing remains one of the most fundamental components of software engineering. In this blog post, we shed light on some of the constructs that Apache Beam provides to allow for testing. We cover an opinionated set of best practices to write unit tests for your data pipeline in this post. Note that this post does not include integration tests, and those should be authored separately. The examples used in this post are in Python, but the concepts translate broadly across SDKs.

Suppose we write a particular PTransform that reads data from a CSV file, gets passed through a custom function for parsing, and is written back to another Google Cloud Storage bucket (we need to do some custom data formatting to have data prepared for a downstream application).


The pipeline is structured as follows:

### Example pipeline 1

#The following packages are used to run the example pipelines
import apache_beam as beam
import apache_beam.io.textio.ReadFromText
import apache_beam.io.textio.WriteToText

with beam.Pipeline(argv=self.args) as p:
result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
| beam.ParDo(lambda x: custom_function(x))
| WriteToText("gs://my-output-bucket-location/")
result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
| beam.ParDo(lambda x: custom_function(x))
| WriteToText("gs://my-output-bucket-location/")

We then add a custom function to our code:

Expand All @@ -54,6 +52,7 @@ In this scenario, we recommend the following best practices:
1. You don’t need to write any unit tests for the already supported connectors in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors are already tested in Beam’s test suite to ensure correct functionality.
2. You should write unit tests for any custom operations introduced in the pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding tests for any lambda functions utilized within these Beam primitives. Additionally, even if you’re using a predefined function, treat the entire transform as a unit, and test it.

### Example Pipeline 1
Let’s use the following pipeline as an example. Because we have a function, we should write a unit test to ensure that our function works as intended.

def compute_square(element):
Expand All @@ -64,6 +63,7 @@ Let’s use the following pipeline as an example. Because we have a function, we
| beam.Map(compute_square)
| WriteToText("gs://my-output-bucket-location/")

### Example Pipeline 2

Now let’s use the following pipeline as another example. Because we use a predefined function, we don’t need to unit test the function, as `str.strip`, is tested elsewhere. However, we do need to test the output of the `beam.Map` function.

Expand All @@ -83,14 +83,14 @@ Here are the corresponding tests for both pipelines:
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBeam(unittest.TestCase):

# This test corresponds to pipeline p1, and is written to confirm the compute_square function works as intended.
# This test corresponds to pipeline p1, and is written to confirm the compute_square function works as intended.
def test_compute_square(self):
numbers=[1,2,3]


with TestPipeline() as p:
output = p | beam.Create([1,2,3])
|beam.Map(compute_square)
| beam.Map(compute_square)
assert_that(output, equal_to([1,4,9]))


Expand All @@ -100,7 +100,7 @@ Here are the corresponding tests for both pipelines:
strings= [' Strawberry \n',' Carrot \n',' Eggplant \n']
with TestPipeline() as p:
output = p | beam.Create(strings)
| beam.Map(str.strip)
| beam.Map(str.strip)
assert_that(output,['Strawberry','Carrot','Eggplant'])


Expand Down

0 comments on commit 736cf50

Please sign in to comment.