Skip to content

Commit

Permalink
Add Lineage metrics to Python BigQueryIO
Browse files Browse the repository at this point in the history
* Introduce metric.Lineage StringSet wrapper
  Reflect Java SDK apache#32090

* Direct Read

* Export Read

* ReadAllFromBigQuery

* FILE_LOAD Write
  • Loading branch information
Abacn committed Aug 8, 2024
1 parent 99a2383 commit bae780b
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 1 deletion.
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def chain_after(result):
from apache_beam.io.iobase import SourceBundle
from apache_beam.io.textio import _TextSource as TextSource
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
Expand Down Expand Up @@ -809,6 +810,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.get(), project=self._get_project())
elif not self.table_reference.projectId:
self.table_reference.projectId = self._get_project()
Lineage.sources().add(
'bigquery',
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)

schema, metadata_list = self._export_files(bq)
self.export_result = _BigQueryExportResult(
Expand Down Expand Up @@ -1157,6 +1163,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
Lineage.sources().add(
"bigquery",
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)

if self.use_native_datetime:
requested_session.data_format = bq_storage.types.DataFormat.ARROW
Expand Down
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from apache_beam.io import filesystems as fs
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import trigger
Expand Down Expand Up @@ -564,6 +565,11 @@ def process_one(self, element, job_name_prefix):
write_disposition = self.write_disposition
wait_for_job = True
self._observed_tables.add(copy_to_reference.tableId)
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
else:
wait_for_job = False
write_disposition = 'WRITE_APPEND'
Expand Down Expand Up @@ -735,6 +741,12 @@ def process(
yield pvalue.TaggedOutput(
TriggerLoadJobs.TEMP_TABLES,
bigquery_tools.get_hashable_destination(table_reference))
else:
Lineage.sinks().add(
'bigquery',
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)

_LOGGER.info(
'Triggering job %s to load data to BigQuery table %s.'
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.textio import _TextSource
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import ValueProvider
Expand Down Expand Up @@ -261,6 +262,12 @@ def process(self,
for metadata in metadata_list:
yield self._create_source(metadata.path, schema)

Lineage.sources().add(
'bigquery',
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)

if element.query is not None:
self.bq._delete_table(
table_reference.projectId,
Expand Down
80 changes: 79 additions & 1 deletion sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# mypy: disallow-untyped-defs

import logging
import re
from typing import TYPE_CHECKING
from typing import Dict
from typing import FrozenSet
Expand All @@ -50,7 +51,7 @@
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.metricbase import Metric

__all__ = ['Metrics', 'MetricsFilter']
__all__ = ['Metrics', 'MetricsFilter', 'Lineage']

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -305,3 +306,80 @@ def with_steps(self, steps: Iterable[str]) -> 'MetricsFilter':

self._steps.update(steps)
return self


class Lineage:
"""Standard collection of metrics used to record source and sinks information
for lineage tracking."""

LINEAGE_NAMESPACE = "lineage"
SOURCE = "source"
SINK = "sink"

_METRICS = {
SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE),
SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK)
}

def __init__(self, label):
"""Create a Lineage with valid babel (:data:`~Lineage.SOURCE` or
:data:`~Lineage.SINK`)
"""
self.metric = Lineage._METRICS[label]

@classmethod
def sources(cls):
return cls(Lineage.SOURCE)

@classmethod
def sinks(cls):
return cls(Lineage.SINK)

_RESERVED_CHARS = re.compile(r'[:\s.]')

@staticmethod
def wrap_segment(segment: str):
"""Wrap segment to valid segment name.
Specifically, If there are reserved chars (colon, whitespace, dot), escape
with backtick. If the segment is already wrapped, return the original.
"""
if segment.startswith("`") and segment.endswith("`"): return segment
if Lineage._RESERVED_CHARS.search(segment):
return "`" + segment + "`"
return segment

@staticmethod
def get_fq_name(
system: str, *segments: str, route: Optional[str] = None) -> str:
"""Assemble fully qualified name
(`FQN <https://cloud.google.com/data-catalog/docs/fully-qualified-names>`_).
Format:
- `system:segment1.segment2`
- `system:routine:segment1.segment2`
- `system:`segment1.with.dots:clons`.segment2`
This helper method is for internal and testing usage only.
"""
segs = '.'.join(map(Lineage.wrap_segment, segments))
if route:
return ':'.join((system, route, segs))
return ':'.join((system, segs))

def add(
self, system: str, *segments: str, route: Optional[str] = None) -> None:
self.metric.add(self.get_fq_name(system, *segments, route=route))

@staticmethod
def query(results: MetricResults, label: str):
if not label in Lineage._METRICS:
raise ValueError("Label {} does not exist for Lineage", label)
response = results.query(
MetricsFilter().with_namespace(Lineage.LINEAGE_NAMESPACE).with_name(
label))[MetricResults.STRINGSETS]
result = set()
for metric in response:
result.update(metric.committed)
result.update(metric.attempted)
return result
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metric import Lineage
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
Expand Down Expand Up @@ -248,5 +249,26 @@ def test_create_counter_distribution(self):
sampler.stop()


class LineageTest(unittest.TestCase):
def test_fq_name(self):
test_cases = {
"apache-beam": "apache-beam",
"`apache-beam`": "`apache-beam`",
"apache.beam": "`apache.beam`",
"apache:beam": "`apache:beam`",
"apache beam": "`apache beam`",
"`apache beam`": "`apache beam`",
"apache\tbeam": "`apache\tbeam`",
"apache\nbeam": "`apache\nbeam`"
}
for k, v in test_cases.items():
self.assertEqual("apache:" + v, Lineage.get_fq_name("apache", k))
self.assertEqual(
"apache:beam:" + v, Lineage.get_fq_name("apache", k, route="beam"))
self.assertEqual(
"apache:beam:" + v + '.' + v,
Lineage.get_fq_name("apache", k, k, route="beam"))


if __name__ == '__main__':
unittest.main()

0 comments on commit bae780b

Please sign in to comment.