Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lineage metrics to Python BigQueryIO #32116

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def chain_after(result):
from apache_beam.io.iobase import SourceBundle
from apache_beam.io.textio import _TextSource as TextSource
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
Expand Down Expand Up @@ -809,6 +810,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.get(), project=self._get_project())
elif not self.table_reference.projectId:
self.table_reference.projectId = self._get_project()
Lineage.sources().add(
'bigquery',
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)

schema, metadata_list = self._export_files(bq)
self.export_result = _BigQueryExportResult(
Expand Down Expand Up @@ -1157,6 +1163,11 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
Lineage.sources().add(
"bigquery",
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)

if self.use_native_datetime:
requested_session.data_format = bq_storage.types.DataFormat.ARROW
Expand Down
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from apache_beam.io import filesystems as fs
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import trigger
Expand Down Expand Up @@ -564,6 +565,11 @@ def process_one(self, element, job_name_prefix):
write_disposition = self.write_disposition
wait_for_job = True
self._observed_tables.add(copy_to_reference.tableId)
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
else:
wait_for_job = False
write_disposition = 'WRITE_APPEND'
Expand Down Expand Up @@ -735,6 +741,12 @@ def process(
yield pvalue.TaggedOutput(
TriggerLoadJobs.TEMP_TABLES,
bigquery_tools.get_hashable_destination(table_reference))
else:
Lineage.sinks().add(
'bigquery',
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)

_LOGGER.info(
'Triggering job %s to load data to BigQuery table %s.'
Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
Expand Down Expand Up @@ -508,6 +509,9 @@ def test_load_job_id_used(self):
| "GetJobs" >> beam.Map(lambda x: x[1])

assert_that(jobs, equal_to([job_reference]), label='CheckJobProjectIds')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["bigquery:project1.dataset1.table1"]))

def test_load_job_id_use_for_copy_job(self):
destination = 'project1:dataset1.table1'
Expand Down Expand Up @@ -560,6 +564,9 @@ def test_load_job_id_use_for_copy_job(self):
job_reference
]),
label='CheckCopyJobProjectIds')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["bigquery:project1.dataset1.table1"]))

@mock.patch('time.sleep')
def test_wait_for_load_job_completion(self, sleep_mock):
Expand Down Expand Up @@ -717,6 +724,9 @@ def test_multiple_partition_files(self):
copy_jobs | "CountCopyJobs" >> combiners.Count.Globally(),
equal_to([6]),
label='CheckCopyJobCount')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SINK),
set(["bigquery:project1.dataset1.table1"]))

@parameterized.expand([
param(write_disposition=BigQueryDisposition.WRITE_TRUNCATE),
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.textio import _TextSource
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import ValueProvider
Expand Down Expand Up @@ -261,6 +262,12 @@ def process(self,
for metadata in metadata_list:
yield self._create_source(metadata.path, schema)

Lineage.sources().add(
'bigquery',
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)

if element.query is not None:
self.bq._delete_table(
table_reference.projectId,
Expand Down
18 changes: 6 additions & 12 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ def test_bad_schema_public_api_export(self, get_table):
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="dataset.sample_table",
method="EXPORT",
project="project",
output_type='BEAM_ROW')
pipeline

@mock.patch.object(BigQueryWrapper, 'get_table')
def test_bad_schema_public_api_direct_read(self, get_table):
Expand All @@ -159,21 +158,19 @@ def test_bad_schema_public_api_direct_read(self, get_table):
with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="dataset.sample_table",
method="DIRECT_READ",
project="project",
output_type='BEAM_ROW')
pipeline

def test_unsupported_value_provider(self):
with self.assertRaisesRegex(TypeError,
'ReadFromBigQuery: table must be of type string'
'; got ValueProvider instead'):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table=value_provider.ValueProvider(), output_type='BEAM_ROW')
pipeline

def test_unsupported_callable(self):
def filterTable(table):
Expand All @@ -185,35 +182,32 @@ def filterTable(table):
'ReadFromBigQuery: table must be of type string'
'; got a callable instead'):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table=res, output_type='BEAM_ROW')
pipeline

def test_unsupported_query_export(self):
with self.assertRaisesRegex(
ValueError,
"Both a query and an output type of 'BEAM_ROW' were specified. "
"'BEAM_ROW' is not currently supported with queries."):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="project:dataset.sample_table",
method="EXPORT",
query='SELECT name FROM dataset.sample_table',
output_type='BEAM_ROW')
pipeline

def test_unsupported_query_direct_read(self):
with self.assertRaisesRegex(
ValueError,
"Both a query and an output type of 'BEAM_ROW' were specified. "
"'BEAM_ROW' is not currently supported with queries."):
p = apache_beam.Pipeline()
pipeline = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
_ = p | apache_beam.io.gcp.bigquery.ReadFromBigQuery(
table="project:dataset.sample_table",
method="DIRECT_READ",
query='SELECT name FROM dataset.sample_table',
output_type='BEAM_ROW')
pipeline

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.bigquery import _StreamToBigQuery
from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder
from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
Expand All @@ -61,6 +62,7 @@
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher
from apache_beam.metrics.metric import Lineage
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
Expand All @@ -85,9 +87,11 @@
from apitools.base.py.exceptions import HttpError
from apitools.base.py.exceptions import HttpForbiddenError
from google.cloud import bigquery as gcp_bigquery
from google.cloud import bigquery_storage_v1 as bq_storage
from google.api_core import exceptions
except ImportError:
gcp_bigquery = None
bq_storage = None
HttpError = None
HttpForbiddenError = None
exceptions = None
Expand Down Expand Up @@ -460,6 +464,8 @@ def test_create_temp_dataset_exception(self, exception_type, error_message):
self.assertIn(error_message, exc.exception.args[0])

@parameterized.expand([
# read without exception
param(responses=[], expected_retries=0),
# first attempt returns a Http 500 blank error and retries
# second attempt returns a Http 408 blank error and retries,
# third attempt passes
Expand Down Expand Up @@ -540,6 +546,9 @@ def store_callback(unused_request):
# metadata (numBytes), and once to retrieve the table's schema
# Any additional calls are retries
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set(["bigquery:project.dataset.table"]))

@parameterized.expand([
# first attempt returns a Http 429 with transient reason and retries
Expand Down Expand Up @@ -719,6 +728,40 @@ def test_read_export_exception(self, exception_type, error_message):
mock_query_job.assert_called()
self.assertIn(error_message, exc.exception.args[0])

def test_read_direct_lineage(self):
with mock.patch.object(bigquery_tools.BigQueryWrapper,
'_bigquery_client'),\
mock.patch.object(bq_storage.BigQueryReadClient,
'create_read_session'),\
beam.Pipeline() as p:

_ = p | ReadFromBigQuery(
method=ReadFromBigQuery.Method.DIRECT_READ,
table='project:dataset.table')
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set(["bigquery:project.dataset.table"]))

def test_read_all_lineage(self):
with mock.patch.object(_BigQueryReadSplit, '_export_files') as export, \
beam.Pipeline() as p:

export.return_value = (None, [])

_ = (
p
| beam.Create([
beam.io.ReadFromBigQueryRequest(table='project1:dataset1.table1'),
beam.io.ReadFromBigQueryRequest(table='project2:dataset2.table2')
])
| beam.io.ReadAllFromBigQuery(gcs_location='gs://bucket/tmp'))
self.assertSetEqual(
Lineage.query(p.result.metrics(), Lineage.SOURCE),
set([
'bigquery:project1.dataset1.table1',
'bigquery:project2.dataset2.table2'
]))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySink(unittest.TestCase):
Expand Down
Loading
Loading