Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enrichment Transform with BigTable handler #30001

Merged
merged 45 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0ad521c
enrichment v1
riteshghorse Dec 15, 2023
e83bad7
add documentation
riteshghorse Dec 15, 2023
5162960
add doc comment
riteshghorse Dec 15, 2023
a45392d
rerun
riteshghorse Dec 18, 2023
9fdbeb3
update docs, lint
riteshghorse Dec 18, 2023
c541148
update docs, lint
riteshghorse Dec 18, 2023
5c9be0e
add generic type
riteshghorse Dec 18, 2023
9df679c
add generic type
riteshghorse Dec 18, 2023
883ff0d
adjust doc path
riteshghorse Dec 18, 2023
818bb8a
create test row
riteshghorse Dec 18, 2023
e1feeb8
use request type
riteshghorse Dec 18, 2023
40275e9
use request type
riteshghorse Dec 18, 2023
be67a88
change module name
riteshghorse Dec 20, 2023
27ed250
more tests
riteshghorse Jan 2, 2024
4af90f5
remove non-functional params
riteshghorse Jan 3, 2024
041fcd0
lint, doc
riteshghorse Jan 3, 2024
91f58b5
change types for general use
riteshghorse Jan 4, 2024
9fd6813
callable type
riteshghorse Jan 4, 2024
036eceb
dict type
riteshghorse Jan 4, 2024
021f9c4
update signatures
riteshghorse Jan 9, 2024
062b9ef
fix unit test
riteshghorse Jan 9, 2024
b11d3ea
bigtable with column family, ids, rrio-throttler
riteshghorse Jan 11, 2024
46e3a6d
update tests for row filter
riteshghorse Jan 11, 2024
433d5fa
convert handler types from dict to Row
riteshghorse Jan 11, 2024
d18d583
update tests for bigtable
riteshghorse Jan 12, 2024
c5e792c
ran pydocs
riteshghorse Jan 12, 2024
9285a5b
ran pydocs
riteshghorse Jan 12, 2024
641bdf7
mark postcommit
riteshghorse Jan 12, 2024
c36a21e
remove _test file, fix import
riteshghorse Jan 12, 2024
3989c16
enable postcommit
riteshghorse Jan 12, 2024
7102acd
add more tests
riteshghorse Jan 12, 2024
87e32bb
skip tests when dependencies are not installed
riteshghorse Jan 16, 2024
57efa52
add deleted imports from last commit
riteshghorse Jan 16, 2024
282c608
add skip test condition
riteshghorse Jan 16, 2024
deecdbc
fix import order, add TooManyRequests to try-catch
riteshghorse Jan 16, 2024
253633e
make throttler, repeater non-optional
riteshghorse Jan 16, 2024
932fae3
add exception level and tests
riteshghorse Jan 17, 2024
cf88d6f
correct pydoc statement
riteshghorse Jan 17, 2024
5b702d2
add throttle tests
riteshghorse Jan 17, 2024
18d9539
add bigtable improvements
riteshghorse Jan 18, 2024
6e251ce
default app_profile_id
riteshghorse Jan 18, 2024
20b8ba6
add documentation, ignore None assignment
riteshghorse Jan 18, 2024
27974b8
add to changes.md
riteshghorse Jan 18, 2024
7c9a03c
change test structure that throws exception, skip http test for now
riteshghorse Jan 18, 2024
5a626e3
drop postcommit trigger file
riteshghorse Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/transforms/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ def cross_join(left: Dict[str, Any], right: Dict[str, Any]) -> beam.Row:
elif left[k] != v:
_LOGGER.warning(
'%s exists in the input row as well the row fetched '
'from API but have different values. Using the input '
'value, you can override this behavior by passing a '
'custom `join_fn`.' % k)
'from API but have different values - %s and %s. Using the input '
'value (%s) for the enriched row. You can override this behavior by '
'passing a custom `join_fn` to Enrichment transform.' %
(k, left[k], v, left[k]))
return beam.Row(**left)


Expand Down
49 changes: 44 additions & 5 deletions sdks/python/apache_beam/transforms/enrichment_handlers/bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#
import logging
from enum import Enum
from typing import Any
from typing import Dict
from typing import Optional
Expand All @@ -28,11 +29,27 @@

__all__ = [
'EnrichWithBigTable',
'ExceptionLevel',
]

_LOGGER = logging.getLogger(__name__)


class ExceptionLevel(Enum):
"""ExceptionLevel defines the exception level options to either
log a warning, or raise an exception, or do nothing when a BigTable query
returns an empty row.

Members:
- WARNING_ONLY: Log a warning for exception without raising it.
- RAISE: Raise the exception.
- QUIET: Neither log nor raise the exception.
"""
WARNING_ONLY = 0
RAISE = 1
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
QUIET = 2


class EnrichWithBigTable(EnrichmentSourceHandler[beam.Row, beam.Row]):
"""EnrichWithBigTable is a handler for
:class:`apache_beam.transforms.enrichment.Enrichment` transform to interact
Expand All @@ -42,22 +59,30 @@ class EnrichWithBigTable(EnrichmentSourceHandler[beam.Row, beam.Row]):
project_id (str): GCP project-id of the BigTable cluster.
instance_id (str): GCP instance-id of the BigTable cluster.
table_id (str): GCP table-id of the BigTable.
row_key (str): unique row key for BigTable
row_key (str): unique row-key field name from the input `beam.Row` object
to use as `row_key` for BigTable querying.
row_filter: a ``:class:`google.cloud.bigtable.row_filters.RowFilter``` to
filter data read with ``read_row()``.
exception_level: a `enum.Enum` value from
``apache_beam.transforms.enrichment_handlers.bigtable.ExceptionLevel``
to set the level when an empty row is returned from the BigTable query.
Defaults to ``ExceptionLevel.QUIET``.
"""
def __init__(
self,
project_id: str,
instance_id: str,
table_id: str,
row_key: str,
row_filter: Optional[RowFilter] = None):
row_filter: Optional[RowFilter] = None,
exception_level: ExceptionLevel = ExceptionLevel.QUIET,
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
):
self._project_id = project_id
self._instance_id = instance_id
self._table_id = table_id
self._row_key = row_key
self._row_filter = row_filter
self._exception_level = exception_level

def __enter__(self):
"""connect to the Google BigTable cluster."""
Expand All @@ -67,13 +92,14 @@ def __enter__(self):

def __call__(self, request: beam.Row, *args, **kwargs):
"""
Reads a row from the Google BigTable and returns
Reads a row from the GCP BigTable and returns
a `Tuple` of request and response.

Args:
request: the input `beam.Row` to enrich.
"""
response_dict: Dict[str, Any] = {}
row_key: str = ""
try:
request_dict = request._asdict()
row_key = str(request_dict[self._row_key]).encode()
Expand All @@ -82,9 +108,22 @@ def __call__(self, request: beam.Row, *args, **kwargs):
for cf_id, cf_v in row.cells.items():
response_dict[cf_id] = {}
for k, v in cf_v.items():
response_dict[cf_id][k.decode('utf-8')] = v[0].value.decode('utf-8')
response_dict[cf_id][k.decode('utf-8')] = \
v[0].value.decode('utf-8')
elif self._exception_level == ExceptionLevel.WARNING_ONLY:
_LOGGER.warning(
'no matching row found for row_key: %s '
'with row_filter: %s' % (row_key, self._row_filter))
elif self._exception_level == ExceptionLevel.RAISE:
raise ValueError(
'no matching row found for row_key: %s '
'with row_filter=%s' % (row_key, self._row_filter))
except KeyError:
raise KeyError('row_key %s not found in input PCollection.' % row_key)
except NotFound:
_LOGGER.warning('request row_key: %s not found')
raise NotFound(
'GCP BigTable cluster `%s:%s:%s` not found.' %
(self._project_id, self._instance_id, self._table_id))
except Exception as e:
raise e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@

# pylint: disable=ungrouped-imports
try:
from google.api_core.exceptions import NotFound
from google.cloud.bigtable import Client
from google.cloud.bigtable.row_filters import ColumnRangeFilter
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import EnrichWithBigTable
from apache_beam.transforms.enrichment_handlers.bigtable import ExceptionLevel
except ImportError:
raise unittest.SkipTest('GCP BigTable dependencies are not installed.')

Expand Down Expand Up @@ -156,7 +158,10 @@ def test_enrichment_with_bigtable(self):
'product': ['product_id', 'product_name', 'product_stock'],
}
bigtable = EnrichWithBigTable(
self.project_id, self.instance_id, self.table_id, self.row_key)
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
row_key=self.row_key)
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
test_pipeline
Expand All @@ -178,10 +183,10 @@ def test_enrichment_with_bigtable_row_filter(self):
start_column = 'product_name'.encode()
column_filter = ColumnRangeFilter(self.column_family_id, start_column)
bigtable = EnrichWithBigTable(
self.project_id,
self.instance_id,
self.table_id,
self.row_key,
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
row_key=self.row_key,
row_filter=column_filter)
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
Expand All @@ -195,12 +200,15 @@ def test_enrichment_with_bigtable_row_filter(self):
expected_enriched_fields)))

def test_enrichment_with_bigtable_no_enrichment(self):
# row_key which is product_id=11 doesn't exist, so the enriched field
# won't be added. Hence, the response is same as the request.
expected_fields = ['sale_id', 'customer_id', 'product_id', 'quantity']
expected_enriched_fields = {}
bigtable = EnrichWithBigTable(
self.project_id, self.instance_id, self.table_id, self.row_key)
# row_key which is product_id=11 doesn't exist, so the enriched field
# won't be added. Hence, the response is same as the request.
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
row_key=self.row_key)
req = [beam.Row(sale_id=1, customer_id=1, product_id=11, quantity=1)]
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
Expand All @@ -217,31 +225,67 @@ def test_enrichment_with_bigtable_bad_row_filter(self):
# in case of a bad column filter, that is, incorrect column_family_id and
# columns, no enrichment is done. If the column_family is correct but not
# column names then all columns in that column_family are returned.
expected_fields = [
'sale_id',
'customer_id',
'product_id',
'quantity',
]
expected_enriched_fields = {}
start_column = 'car_name'.encode()
column_filter = ColumnRangeFilter('car_name', start_column)
bigtable = EnrichWithBigTable(
self.project_id,
self.instance_id,
self.table_id,
self.row_key,
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
row_key=self.row_key,
row_filter=column_filter)
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
test_pipeline
| "Create" >> beam.Create(self.req)
| "Enrich W/ BigTable" >> Enrichment(bigtable)
| "Validate Response" >> beam.ParDo(
ValidateResponse(
len(expected_fields),
expected_fields,
expected_enriched_fields)))
with self.assertRaises(NotFound):
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
test_pipeline
| "Create" >> beam.Create(self.req)
| "Enrich W/ BigTable" >> Enrichment(bigtable))

def test_enrichment_with_bigtable_raises_key_error(self):
"""raises a `KeyError` when the row_key doesn't exist in
the input PCollection."""
bigtable = EnrichWithBigTable(
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
row_key='car_name')
with self.assertRaises(KeyError):
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
test_pipeline
| "Create" >> beam.Create(self.req)
| "Enrich W/ BigTable" >> Enrichment(bigtable))

def test_enrichment_with_bigtable_raises_not_found(self):
"""raises a `NotFound` exception when the GCP BigTable Cluster
doesn't exist."""
bigtable = EnrichWithBigTable(
project_id=self.project_id,
instance_id=self.instance_id,
table_id='invalid_table',
row_key=self.row_key)
with self.assertRaises(NotFound):
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
test_pipeline
| "Create" >> beam.Create(self.req)
| "Enrich W/ BigTable" >> Enrichment(bigtable))

def test_enrichment_with_bigtable_exception_level(self):
"""raises a `ValueError` exception when the GCP BigTable query returns
an empty row."""
bigtable = EnrichWithBigTable(
project_id=self.project_id,
instance_id=self.instance_id,
table_id=self.table_id,
row_key=self.row_key,
exception_level=ExceptionLevel.RAISE)
req = [beam.Row(sale_id=1, customer_id=1, product_id=11, quantity=1)]
with self.assertRaises(ValueError):
with TestPipeline(is_integration_test=True) as test_pipeline:
_ = (
test_pipeline
| "Create" >> beam.Create(req)
| "Enrich W/ BigTable" >> Enrichment(bigtable))


if __name__ == '__main__':
Expand Down
Loading