diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index e1c509d0e490c..b897df2d32ab3 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -400,6 +400,7 @@ def chain_after(result): from apache_beam.io.iobase import SourceBundle from apache_beam.io.textio import _TextSource as TextSource from apache_beam.metrics import Metrics +from apache_beam.metrics.metric import Lineage from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -809,6 +810,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.table_reference.get(), project=self._get_project()) elif not self.table_reference.projectId: self.table_reference.projectId = self._get_project() + Lineage.sources().add( + 'bigquery', + self.table_reference.projectId, + self.table_reference.datasetId, + self.table_reference.tableId) schema, metadata_list = self._export_files(bq) self.export_result = _BigQueryExportResult( @@ -1157,6 +1163,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.table_reference.projectId, self.table_reference.datasetId, self.table_reference.tableId) + Lineage.sources().add( + "bigquery", + self.table_reference.projectId, + self.table_reference.datasetId, + self.table_reference.tableId) if self.use_native_datetime: requested_session.data_format = bq_storage.types.DataFormat.ARROW diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index e1a4af31f1c2e..3145fb5110680 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -40,6 +40,7 @@ from apache_beam.io import filesystems as fs from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata +from apache_beam.metrics.metric import Lineage from apache_beam.options import value_provider as vp from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.transforms import trigger @@ -564,6 +565,11 @@ def process_one(self, element, job_name_prefix): write_disposition = self.write_disposition wait_for_job = True self._observed_tables.add(copy_to_reference.tableId) + Lineage.sinks().add( + 'bigquery', + copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId) else: wait_for_job = False write_disposition = 'WRITE_APPEND' @@ -735,6 +741,12 @@ def process( yield pvalue.TaggedOutput( TriggerLoadJobs.TEMP_TABLES, bigquery_tools.get_hashable_destination(table_reference)) + else: + Lineage.sinks().add( + 'bigquery', + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId) _LOGGER.info( 'Triggering job %s to load data to BigQuery table %s.' diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py index f3881ed261ae3..f038b48e04d53 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py @@ -44,6 +44,7 @@ from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata from apache_beam.io.iobase import BoundedSource from apache_beam.io.textio import _TextSource +from apache_beam.metrics.metric import Lineage from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import ValueProvider @@ -261,6 +262,12 @@ def process(self, for metadata in metadata_list: yield self._create_source(metadata.path, schema) + Lineage.sources().add( + 'bigquery', + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId) + if element.query is not None: self.bq._delete_table( table_reference.projectId, diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 77cafb8bd64b7..e238b2c3b7876 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -28,6 +28,7 @@ # mypy: disallow-untyped-defs import logging +import re from typing import TYPE_CHECKING from typing import Dict from typing import FrozenSet @@ -50,7 +51,7 @@ from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import Metric -__all__ = ['Metrics', 'MetricsFilter'] +__all__ = ['Metrics', 'MetricsFilter', 'Lineage'] _LOGGER = logging.getLogger(__name__) @@ -305,3 +306,80 @@ def with_steps(self, steps: Iterable[str]) -> 'MetricsFilter': self._steps.update(steps) return self + + +class Lineage: + """Standard collection of metrics used to record source and sinks information + for lineage tracking.""" + + LINEAGE_NAMESPACE = "lineage" + SOURCE = "source" + SINK = "sink" + + _METRICS = { + SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE), + SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK) + } + + def __init__(self, label): + """Create a Lineage with valid babel (:data:`~Lineage.SOURCE` or + :data:`~Lineage.SINK`) + """ + self.metric = Lineage._METRICS[label] + + @classmethod + def sources(cls): + return cls(Lineage.SOURCE) + + @classmethod + def sinks(cls): + return cls(Lineage.SINK) + + _RESERVED_CHARS = re.compile(r'[:\s.]') + + @staticmethod + def wrap_segment(segment: str): + """Wrap segment to valid segment name. + + Specifically, If there are reserved chars (colon, whitespace, dot), escape + with backtick. If the segment is already wrapped, return the original. + """ + if segment.startswith("`") and segment.endswith("`"): return segment + if Lineage._RESERVED_CHARS.search(segment): + return "`" + segment + "`" + return segment + + @staticmethod + def get_fq_name( + system: str, *segments: str, route: Optional[str] = None) -> str: + """Assemble fully qualified name + (`FQN `_). + Format: + + - `system:segment1.segment2` + - `system:routine:segment1.segment2` + - `system:`segment1.with.dots:clons`.segment2` + + This helper method is for internal and testing usage only. + """ + segs = '.'.join(map(Lineage.wrap_segment, segments)) + if route: + return ':'.join((system, route, segs)) + return ':'.join((system, segs)) + + def add( + self, system: str, *segments: str, route: Optional[str] = None) -> None: + self.metric.add(self.get_fq_name(system, *segments, route=route)) + + @staticmethod + def query(results: MetricResults, label: str): + if not label in Lineage._METRICS: + raise ValueError("Label {} does not exist for Lineage", label) + response = results.query( + MetricsFilter().with_namespace(Lineage.LINEAGE_NAMESPACE).with_name( + label))[MetricResults.STRINGSETS] + result = set() + for metric in response: + result.update(metric.committed) + result.update(metric.attempted) + return result diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index e3701228feecd..3a8da021101e5 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -28,6 +28,7 @@ from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import MetricsEnvironment +from apache_beam.metrics.metric import Lineage from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metric import Metrics from apache_beam.metrics.metric import MetricsFilter @@ -248,5 +249,26 @@ def test_create_counter_distribution(self): sampler.stop() +class LineageTest(unittest.TestCase): + def test_fq_name(self): + test_cases = { + "apache-beam": "apache-beam", + "`apache-beam`": "`apache-beam`", + "apache.beam": "`apache.beam`", + "apache:beam": "`apache:beam`", + "apache beam": "`apache beam`", + "`apache beam`": "`apache beam`", + "apache\tbeam": "`apache\tbeam`", + "apache\nbeam": "`apache\nbeam`" + } + for k, v in test_cases.items(): + self.assertEqual("apache:" + v, Lineage.get_fq_name("apache", k)) + self.assertEqual( + "apache:beam:" + v, Lineage.get_fq_name("apache", k, route="beam")) + self.assertEqual( + "apache:beam:" + v + '.' + v, + Lineage.get_fq_name("apache", k, k, route="beam")) + + if __name__ == '__main__': unittest.main()