diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index ba8462a6f5ff32..f248533edec8d7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -6,6 +6,7 @@ from google.api_core import retry from google.cloud import bigquery, datacatalog_v1, resourcemanager_v3 +from google.cloud.bigquery import retry as bq_retry from google.cloud.bigquery.table import ( RowIterator, TableListItem, @@ -155,8 +156,23 @@ def __init__( self.datacatalog_client = datacatalog_client def get_query_result(self, query: str) -> RowIterator: + def _should_retry(exc: BaseException) -> bool: + logger.debug(f"Exception occured for job query. Reason: {exc}") + # Jobs sometimes fail with transient errors. + # This is not currently handled by the python-bigquery client. + # https://github.com/googleapis/python-bigquery/issues/23 + return "Retrying the job may solve the problem" in str(exc) + logger.debug(f"Query : {query}") - resp = self.bq_client.query(query) + resp = self.bq_client.query( + query, + job_retry=retry.Retry( + predicate=lambda exc: ( + bq_retry.DEFAULT_JOB_RETRY._predicate(exc) or _should_retry(exc) + ), + deadline=bq_retry.DEFAULT_JOB_RETRY._deadline, + ), + ) return resp.result() def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]: