Skip to content

Commit

Permalink
Unit Testing in Beam Blog Post (#32412)
Browse files Browse the repository at this point in the history
* blog post draft

* Created using Colab

* colab path

* Created using Colab

* path for colab

* whitespace

* grammar fixes.

* Created using Colab

* path to colab

* date + code link

* Update website/www/site/content/en/blog/unit-testing-in-beam.md

Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com>

* Update website/www/site/content/en/blog/unit-testing-in-beam.md

Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com>

* Update website/www/site/content/en/blog/unit-testing-in-beam.md

Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com>

* Update website/www/site/content/en/blog/unit-testing-in-beam.md

Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com>

---------

Co-authored-by: Rebecca Szper <98840847+rszper@users.noreply.github.com>
  • Loading branch information
svetakvsundhar and rszper committed Sep 12, 2024
1 parent 4ee2606 commit 02af7d4
Show file tree
Hide file tree
Showing 2 changed files with 331 additions and 125 deletions.
258 changes: 133 additions & 125 deletions examples/notebooks/blog/unittests_in_beam.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"metadata": {
"colab": {
"provenance": [],
"authorship_tag": "ABX9TyM16129G+tIfKxNIGenSDL1",
"authorship_tag": "ABX9TyNKlk6MKeCAFiaFkcs9pvkB",
"include_colab_link": true
},
"kernelspec": {
Expand Down Expand Up @@ -85,136 +85,28 @@
"source": [
"**Example 1**\n",
"\n",
"This `DoFn` (and corresponding pipeline) is used to convey a situation in which a `DoFn` makes an API call. Note that an error is raised here if the length of the API response (returned_record) is less than length 10."
"The following example shows how to use the `Map` construct to calculate median house value per bedroom.\n"
],
"metadata": {
"id": "Z8__izORM3r8"
"id": "IVjBkewt1sLA"
}
},
{
"cell_type": "code",
"source": [
"# Fake client to simulate an external call\n",
"\n",
"import time\n",
"class Client():\n",
" def get_data(self, api):\n",
" time.sleep(3)\n",
" return [0,1,2,3,4,5,6,7,8,9]\n",
"\n",
"MyApiCall = Client()"
],
"metadata": {
"id": "GGPF7cY3Ntyj"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"#The following packages are used to run the example pipelines\n",
"\n",
"import apache_beam as beam\n",
"from apache_beam.io import ReadFromText, WriteToText\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"class MyDoFn(beam.DoFn):\n",
" def process(self,element):\n",
" returned_record = MyApiCall.get_data(\"http://my-api-call.com\")\n",
" if len(returned_record)!=10:\n",
" raise ValueError(\"Length of record does not match expected length\")\n",
" yield returned_record\n",
"\n",
"with beam.Pipeline() as p:\n",
" result = (\n",
" p\n",
" | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
" | beam.ParDo(MyDoFn())\n",
" | WriteToText(\"/content/example1\")\n",
" )"
],
"metadata": {
"id": "Ktk9EVIFzGfP"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"**Mocking Example**\n",
"\n",
"The following blocks of code illustrate how we can mock an API response, to test out the error message we've written. Note that we can use mocking to avoid making the actual API call in our test."
],
"metadata": {
"id": "58GVMyMa2PwE"
}
},
{
"cell_type": "code",
"source": [
"!pip install mock # Install the 'mock' module"
],
"metadata": {
"id": "ESclJ_G-6JcW"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# We import the mock package for mocking functionality.\n",
"from unittest.mock import Mock,patch\n",
"# from MyApiCall import get_data\n",
"import mock\n",
"\n",
"\n",
"# MyApiCall is a function that calls get_data to fetch some data via an API call.\n",
"@patch('MyApiCall.get_data')\n",
"def test_error_message_wrong_length(self, mock_get_data):\n",
" response = ['field1','field2']\n",
" mock_get_data.return_value = Mock()\n",
" mock_get_data.return_value.json.return_value=response\n",
"\n",
" input_elements = ['-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000'] #input length 9\n",
" with self.assertRaisesRegex(ValueError,\n",
" \"Length of record does not match expected length'\"):\n",
" p = beam.Pipeline()\n",
" result = p | beam.create(input_elements) | beam.ParDo(MyDoFn())\n",
" result\n"
],
"metadata": {
"id": "IRuv8s8a2O8F"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"**Example 2**\n",
"\n",
"The following example shows how we can use the `Map` construct to calculate median house value per bedroom.\n"
],
"metadata": {
"id": "IVjBkewt1sLA"
}
},
{
"cell_type": "code",
"source": [
"# The following code computes the median house value per bedroom\n",
"def median_house_value_per_bedroom(element):\n",
" # median_house_value is at index 8 and total_bedrooms is at index 4\n",
" element = element.strip().split(',')\n",
" return float(element[8])/float(element[4])\n",
"\n",
"\n",
"with beam.Pipeline() as p2:\n",
"with beam.Pipeline() as p1:\n",
" result = (\n",
" p2\n",
" p1\n",
" | ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
" | beam.Map(median_house_value_per_bedroom)\n",
" | WriteToText(\"/content/example2\")\n",
Expand All @@ -229,9 +121,9 @@
{
"cell_type": "markdown",
"source": [
"**Example 3**\n",
"**Example 2**\n",
"\n",
"The following code is an extension of example 2, but with more complex pipeline logic. Thus, you will see that the `median_house_value_per_bedroom` function is now more complex, and involves writing to various keys."
"The following code is an extension of example 1, but with more complex pipeline logic. The `median_house_value_per_bedroom` function is now more complex, and involves writing to various keys."
],
"metadata": {
"id": "Mh3nZZ1_12sX"
Expand All @@ -241,7 +133,7 @@
"cell_type": "code",
"source": [
"import random\n",
"# The following code computes the median house value per bedroom\n",
"# The following code computes the median house value per bedroom.\n",
"counter=-1 #define a counter globally\n",
"\n",
"\n",
Expand All @@ -260,9 +152,9 @@
" return (key,value*10)\n",
"\n",
"\n",
"with beam.Pipeline() as p3:\n",
"with beam.Pipeline() as p2:\n",
" result = (\n",
" p3\n",
" p2\n",
" | ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
" | beam.Map(median_house_value_per_bedroom)\n",
" | beam.Map(multiply_by_factor)\n",
Expand Down Expand Up @@ -294,14 +186,14 @@
" | beam.Map(multiply_by_factor)\n",
" | beam.CombinePerKey(sum))\n",
"\n",
"# Define a new class that inherits from beam.PTransform\n",
"# Define a new class that inherits from beam.PTransform.\n",
"class MapAndCombineTransform(beam.PTransform):\n",
" def expand(self, pcoll):\n",
" return transform_data_set(pcoll)\n",
"\n",
"with beam.Pipeline() as p3:\n",
"with beam.Pipeline() as p2:\n",
" result = (\n",
" p3\n",
" p2\n",
" | ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
" | MapAndCombineTransform() # Use the new PTransform class\n",
" | WriteToText(\"/content/example3\")\n",
Expand All @@ -316,7 +208,7 @@
{
"cell_type": "markdown",
"source": [
"**Unit Test for Pipeline 3**\n",
"**Unit Test for Pipeline 2**\n",
"\n",
"We've populated some sample records here, as well as set what we're expecting our expected value to be."
],
Expand All @@ -328,7 +220,6 @@
"cell_type": "code",
"source": [
"import unittest\n",
"import apache_beam as beam\n",
"from apache_beam.testing.test_pipeline import TestPipeline\n",
"from apache_beam.testing.util import assert_that, equal_to\n",
"\n",
Expand All @@ -344,9 +235,9 @@
" '122.05,100.99,24.30,40.5,56.55,42.01,11,35,75.30,92.91',\n",
" '-120.05,39.37,29.00,4085.00,681.00,1557.00,626.00,6.8085,364700.00'\n",
" ]\n",
" with beam.Pipeline() as p3:\n",
" with beam.Pipeline() as p2:\n",
" result = (\n",
" p3\n",
" p2\n",
" | beam.Create(input_elements)\n",
" | beam.Map(MapAndCombineTransform())\n",
" )\n",
Expand All @@ -357,6 +248,123 @@
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"**Example 3**\n",
"\n",
"This `DoFn` and the corresponding pipeline demonstrate a `DoFn` making an API call. An error occurs if the length of the API response (`returned_record`) is less than the length `10`."
],
"metadata": {
"id": "Z8__izORM3r8"
}
},
{
"cell_type": "code",
"source": [
"# Fake client to simulate an external call\n",
"\n",
"import time\n",
"class Client():\n",
" def get_data(self, api):\n",
" time.sleep(3)\n",
" return [0,1,2,3,4,5,6,7,8,9]\n",
"\n",
"MyApiCall = Client()"
],
"metadata": {
"id": "GGPF7cY3Ntyj"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"**Note:** The following cell can take about 2 minutes to run"
],
"metadata": {
"id": "3tGnPucbzmEx"
}
},
{
"cell_type": "code",
"source": [
"# The following packages are used to run the example pipelines.\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"class MyDoFn(beam.DoFn):\n",
" def process(self,element):\n",
" returned_record = MyApiCall.get_data(\"http://my-api-call.com\")\n",
" if len(returned_record)!=10:\n",
" raise ValueError(\"Length of record does not match expected length\")\n",
" yield returned_record\n",
"\n",
"with beam.Pipeline() as p3:\n",
" result = (\n",
" p3\n",
" | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
" | beam.ParDo(MyDoFn())\n",
" | WriteToText(\"/content/example1\")\n",
" )"
],
"metadata": {
"id": "Ktk9EVIFzGfP"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"**Mocking Example**\n",
"\n",
"To test the error message, mock an API response, as demonstrated in the following blocks of code. Use mocking to avoid making the actual API call in the test."
],
"metadata": {
"id": "58GVMyMa2PwE"
}
},
{
"cell_type": "code",
"source": [
"!pip install mock # Install the 'mock' module."
],
"metadata": {
"id": "ESclJ_G-6JcW"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# Import the mock package for mocking functionality.\n",
"from unittest.mock import Mock,patch\n",
"# from MyApiCall import get_data\n",
"import mock\n",
"\n",
"\n",
"# MyApiCall is a function that calls get_data to fetch some data by using an API call.\n",
"@patch('MyApiCall.get_data')\n",
"def test_error_message_wrong_length(self, mock_get_data):\n",
" response = ['field1','field2']\n",
" mock_get_data.return_value = Mock()\n",
" mock_get_data.return_value.json.return_value=response\n",
"\n",
" input_elements = ['-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000'] #input length 9\n",
" with self.assertRaisesRegex(ValueError,\n",
" \"Length of record does not match expected length'\"):\n",
" p3 = beam.Pipeline()\n",
" result = p3 | beam.create(input_elements) | beam.ParDo(MyDoFn())\n",
" result\n"
],
"metadata": {
"id": "IRuv8s8a2O8F"
},
"execution_count": null,
"outputs": []
}
]
}
Loading

0 comments on commit 02af7d4

Please sign in to comment.