From 1c4d424b3fc8771dab11bce2b4e8725c65446753 Mon Sep 17 00:00:00 2001 From: Jay Feldman Date: Tue, 13 Aug 2024 11:21:27 -0400 Subject: [PATCH 1/6] feat(ingest/bigquery): Add job retries for transient errors --- .../ingestion/source/bigquery_v2/bigquery_schema.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index d73ac46c862ea..1f87fc49f3d0e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -6,6 +6,7 @@ from google.api_core import retry from google.cloud import bigquery, datacatalog_v1 +from google.clould.bigquery import retry as bq_retry from google.cloud.bigquery.table import ( RowIterator, TableListItem, @@ -151,8 +152,16 @@ def __init__( self.datacatalog_client = datacatalog_client def get_query_result(self, query: str) -> RowIterator: + def _should_retry(exc: BaseException) -> bool: + logger.debug( + f"Exception occured for job query. Reason: {exc}" + ) + # Jobs sometimes fail with transient errors. + # This is not currently handled by the python-bigquery client. + # https://github.com/googleapis/python-bigquery/issues/23 + return ("Retrying the job may solve the problem" in str(exc)) logger.debug(f"Query : {query}") - resp = self.bq_client.query(query) + resp = self.bq_client.query(query, job_retry=retry.Retry(predicate=lambda exc: (bq_retry.DEFAULT_JOB_RETRY._predicate(exc) or _should_retry(exc)), deadline=bq_retry.DEFAULT_JOB_RETRY._deadline)) return resp.result() def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]: From 7c8fb54e25f32bfb78f6fc808d7a460d00e276e4 Mon Sep 17 00:00:00 2001 From: Jay Feldman Date: Tue, 13 Aug 2024 11:49:40 -0400 Subject: [PATCH 2/6] Fix lint --- .../source/bigquery_v2/bigquery_schema.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 1f87fc49f3d0e..d533fa9fd3284 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -6,13 +6,13 @@ from google.api_core import retry from google.cloud import bigquery, datacatalog_v1 -from google.clould.bigquery import retry as bq_retry from google.cloud.bigquery.table import ( RowIterator, TableListItem, TimePartitioning, TimePartitioningType, ) +from google.clould.bigquery import retry as bq_retry from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels @@ -153,15 +153,22 @@ def __init__( def get_query_result(self, query: str) -> RowIterator: def _should_retry(exc: BaseException) -> bool: - logger.debug( - f"Exception occured for job query. Reason: {exc}" - ) + logger.debug(f"Exception occured for job query. Reason: {exc}") # Jobs sometimes fail with transient errors. # This is not currently handled by the python-bigquery client. # https://github.com/googleapis/python-bigquery/issues/23 - return ("Retrying the job may solve the problem" in str(exc)) + return "Retrying the job may solve the problem" in str(exc) + logger.debug(f"Query : {query}") - resp = self.bq_client.query(query, job_retry=retry.Retry(predicate=lambda exc: (bq_retry.DEFAULT_JOB_RETRY._predicate(exc) or _should_retry(exc)), deadline=bq_retry.DEFAULT_JOB_RETRY._deadline)) + resp = self.bq_client.query( + query, + job_retry=retry.Retry( + predicate=lambda exc: ( + bq_retry.DEFAULT_JOB_RETRY._predicate(exc) or _should_retry(exc) + ), + deadline=bq_retry.DEFAULT_JOB_RETRY._deadline, + ), + ) return resp.result() def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]: From a79988ff6741b2c65a41ed3b3c436f366bbbb65f Mon Sep 17 00:00:00 2001 From: Jay Feldman Date: Tue, 13 Aug 2024 12:02:47 -0400 Subject: [PATCH 3/6] Fix syntax --- .../src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index d533fa9fd3284..6c31167a897af 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -157,7 +157,7 @@ def _should_retry(exc: BaseException) -> bool: # Jobs sometimes fail with transient errors. # This is not currently handled by the python-bigquery client. # https://github.com/googleapis/python-bigquery/issues/23 - return "Retrying the job may solve the problem" in str(exc) + return ("Retrying the job may solve the problem" in str(exc)) logger.debug(f"Query : {query}") resp = self.bq_client.query( From 3c1bcd2e8e5ead3cf5c6a2ab86155ce1afe2039e Mon Sep 17 00:00:00 2001 From: Jay Feldman Date: Tue, 13 Aug 2024 12:09:35 -0400 Subject: [PATCH 4/6] Fix typo --- .../src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 6c31167a897af..ae018db0466d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -12,7 +12,7 @@ TimePartitioning, TimePartitioningType, ) -from google.clould.bigquery import retry as bq_retry +from google.cloud.bigquery import retry as bq_retry from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels From c566135017359bb0ad9d15def041510a599baf7c Mon Sep 17 00:00:00 2001 From: Jay Feldman Date: Tue, 13 Aug 2024 12:29:43 -0400 Subject: [PATCH 5/6] Fix lint --- .../datahub/ingestion/source/bigquery_v2/bigquery_schema.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index ae018db0466d9..3b6cca59d8289 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -6,13 +6,13 @@ from google.api_core import retry from google.cloud import bigquery, datacatalog_v1 +from google.cloud.bigquery import retry as bq_retry from google.cloud.bigquery.table import ( RowIterator, TableListItem, TimePartitioning, TimePartitioningType, ) -from google.cloud.bigquery import retry as bq_retry from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier from datahub.ingestion.source.bigquery_v2.bigquery_helper import parse_labels @@ -157,7 +157,7 @@ def _should_retry(exc: BaseException) -> bool: # Jobs sometimes fail with transient errors. # This is not currently handled by the python-bigquery client. # https://github.com/googleapis/python-bigquery/issues/23 - return ("Retrying the job may solve the problem" in str(exc)) + return "Retrying the job may solve the problem" in str(exc) logger.debug(f"Query : {query}") resp = self.bq_client.query( From f0066a6362dc355475e76168fa9344488d198f75 Mon Sep 17 00:00:00 2001 From: Gabe Lyons Date: Mon, 26 Aug 2024 11:54:40 -0700 Subject: [PATCH 6/6] fixing import formatting --- .../src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 6301c38981586..f248533edec8d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -5,8 +5,8 @@ from typing import Any, Dict, Iterable, Iterator, List, Optional from google.api_core import retry -from google.cloud.bigquery import retry as bq_retry from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 +from google.cloud.bigquery import retry as bq_retry from google.cloud.bigquery.table import ( RowIterator, TableListItem,