From f08d8aac0859a2771004cefed9897f3c5d0f896b Mon Sep 17 00:00:00 2001 From: James McMurray Date: Wed, 23 Jun 2021 14:02:48 +0200 Subject: [PATCH 1/3] Patch BQClient to retry on network error At the moment sometimes long jobs can cause the BigQueryClient to timeout since the connection is created at initialisation of the target. This patch retries by re-creating the client object if it fails (up to a retry limit), so that the client will not be timed out when calling `complete()` on the BigQueryTarget after a long job. --- luigi/contrib/bigquery.py | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index 55fc3b72c8..b4007cd66c 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -120,11 +120,22 @@ class BigQueryClient: https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest """ - def __init__(self, oauth_credentials=None, descriptor='', http_=None): - authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_) - - if descriptor: - self.client = discovery.build_from_document(descriptor, **authenticate_kwargs) + def __init__(self, oauth_credentials=None, descriptor='', http_=None, retry_limit=2): + # Save initialisation arguments in case we need to re-create client + # due to connection timeout + self.oauth_credentials = oauth_credentials + self.descriptor = descriptor + self.http_ = http_ + self.retry_limit = retry_limit + self.retry_count = 0 + + self.__initialise_client() + + def __initialise_client(self): + authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_) + + if self.descriptor: + self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs) else: self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs) @@ -147,6 +158,12 @@ def dataset_exists(self, dataset): fetched_location if fetched_location is not None else 'unspecified', dataset.location)) + except (TimeoutError, BrokenPipeError, IOError) as bq_connection_error: + if self.retry_count > self.retry_limit: + raise Exception(f"Exceeded max retries for BigQueryClient connection error: {bq_connection_error}") from bq_connection_error + self.retry_count += 1 + self.__initialise_client() + self.dataset_exists(dataset) except http.HttpError as ex: if ex.resp.status == 404: return False @@ -167,6 +184,12 @@ def table_exists(self, table): self.client.tables().get(projectId=table.project_id, datasetId=table.dataset_id, tableId=table.table_id).execute() + except (TimeoutError, BrokenPipeError, IOError) as bq_connection_error: + if self.retry_count > self.retry_limit: + raise Exception(f"Exceeded max retries for BigQueryClient connection error: {bq_connection_error}") from bq_connection_error + self.retry_count += 1 + self.__initialise_client() + self.table_exists(table) except http.HttpError as ex: if ex.resp.status == 404: return False From df6e92dde2676e90fd0f57a7f9c7599790b7d78f Mon Sep 17 00:00:00 2001 From: James McMurray Date: Thu, 24 Jun 2021 13:52:07 +0200 Subject: [PATCH 2/3] Switch BQ retry to use tenacity decorator Note the use of the after lambda function to re-create the client if the connection fails --- luigi/contrib/bigquery.py | 43 ++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index b4007cd66c..147b2fc341 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -21,14 +21,38 @@ import time from luigi.contrib import gcp +from tenacity import retry +from tenacity import retry_if_exception +from tenacity import retry_if_exception_type +from tenacity import wait_exponential +from tenacity import stop_after_attempt + logger = logging.getLogger('luigi-interface') +RETRYABLE_ERRORS = None try: + import httplib2 from googleapiclient import discovery + from googleapiclient import errors from googleapiclient import http except ImportError: logger.warning('BigQuery module imported, but google-api-python-client is ' 'not installed. Any BigQuery task will fail') +else: + RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError, TimeoutError, BrokenPipeError) + + +# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/ +def is_error_5xx(err): + return isinstance(err, errors.HttpError) and err.resp.status >= 500 + + +bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)), + wait=wait_exponential(multiplier=1, min=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + after=lambda x: x.args[0].__initialise_client() + ) class CreateDisposition: @@ -120,14 +144,12 @@ class BigQueryClient: https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest """ - def __init__(self, oauth_credentials=None, descriptor='', http_=None, retry_limit=2): + def __init__(self, oauth_credentials=None, descriptor='', http_=None): # Save initialisation arguments in case we need to re-create client # due to connection timeout self.oauth_credentials = oauth_credentials self.descriptor = descriptor self.http_ = http_ - self.retry_limit = retry_limit - self.retry_count = 0 self.__initialise_client() @@ -139,6 +161,7 @@ def __initialise_client(self): else: self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs) + @bq_retry def dataset_exists(self, dataset): """Returns whether the given dataset exists. If regional location is specified for the dataset, that is also checked @@ -157,13 +180,6 @@ def dataset_exists(self, dataset): raise Exception('''Dataset already exists with regional location {}. Can't use {}.'''.format( fetched_location if fetched_location is not None else 'unspecified', dataset.location)) - - except (TimeoutError, BrokenPipeError, IOError) as bq_connection_error: - if self.retry_count > self.retry_limit: - raise Exception(f"Exceeded max retries for BigQueryClient connection error: {bq_connection_error}") from bq_connection_error - self.retry_count += 1 - self.__initialise_client() - self.dataset_exists(dataset) except http.HttpError as ex: if ex.resp.status == 404: return False @@ -171,6 +187,7 @@ def dataset_exists(self, dataset): return True + @bq_retry def table_exists(self, table): """Returns whether the given table exists. @@ -184,12 +201,6 @@ def table_exists(self, table): self.client.tables().get(projectId=table.project_id, datasetId=table.dataset_id, tableId=table.table_id).execute() - except (TimeoutError, BrokenPipeError, IOError) as bq_connection_error: - if self.retry_count > self.retry_limit: - raise Exception(f"Exceeded max retries for BigQueryClient connection error: {bq_connection_error}") from bq_connection_error - self.retry_count += 1 - self.__initialise_client() - self.table_exists(table) except http.HttpError as ex: if ex.resp.status == 404: return False From 160404193ef4008932653aa6bfe3fd5fddb877e5 Mon Sep 17 00:00:00 2001 From: James McMurray Date: Mon, 28 Jun 2021 10:50:19 +0200 Subject: [PATCH 3/3] Fix indentation --- luigi/contrib/bigquery.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/luigi/contrib/bigquery.py b/luigi/contrib/bigquery.py index 147b2fc341..9840a7c247 100644 --- a/luigi/contrib/bigquery.py +++ b/luigi/contrib/bigquery.py @@ -48,11 +48,11 @@ def is_error_5xx(err): bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)), - wait=wait_exponential(multiplier=1, min=1, max=10), - stop=stop_after_attempt(3), - reraise=True, - after=lambda x: x.args[0].__initialise_client() - ) + wait=wait_exponential(multiplier=1, min=1, max=10), + stop=stop_after_attempt(3), + reraise=True, + after=lambda x: x.args[0].__initialise_client() + ) class CreateDisposition: