Skip to content

Commit

Permalink
Patch BigQueryClient to retry on network error (#3088)
Browse files Browse the repository at this point in the history
* Patch BQClient to retry on network error

At the moment sometimes long jobs can cause the BigQueryClient to
timeout since the connection is created at initialisation of the target.

This patch retries by re-creating the client object if it fails (up to a
retry limit), so that the client will not be timed out when calling
`complete()` on the BigQueryTarget after a long job.

* Switch BQ retry to use tenacity decorator

Note the use of the after lambda function to re-create the client if the
connection fails

* Fix indentation

Co-authored-by: James McMurray <jmcmurray@spotify.com>
Co-authored-by: Dillon Stadther <dlstadther+github@gmail.com>
  • Loading branch information
3 people authored Oct 13, 2021
1 parent 0c5c53b commit ad5ddc9
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,38 @@
import time
from luigi.contrib import gcp

from tenacity import retry
from tenacity import retry_if_exception
from tenacity import retry_if_exception_type
from tenacity import wait_exponential
from tenacity import stop_after_attempt

logger = logging.getLogger('luigi-interface')

RETRYABLE_ERRORS = None
try:
import httplib2
from googleapiclient import discovery
from googleapiclient import errors
from googleapiclient import http
except ImportError:
logger.warning('BigQuery module imported, but google-api-python-client is '
'not installed. Any BigQuery task will fail')
else:
RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError, TimeoutError, BrokenPipeError)


# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/
def is_error_5xx(err):
return isinstance(err, errors.HttpError) and err.resp.status >= 500


bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)),
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
after=lambda x: x.args[0].__initialise_client()
)


class CreateDisposition:
Expand Down Expand Up @@ -122,13 +146,23 @@ class BigQueryClient:
"""

def __init__(self, oauth_credentials=None, descriptor='', http_=None):
authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_)
# Save initialisation arguments in case we need to re-create client
# due to connection timeout
self.oauth_credentials = oauth_credentials
self.descriptor = descriptor
self.http_ = http_

self.__initialise_client()

if descriptor:
self.client = discovery.build_from_document(descriptor, **authenticate_kwargs)
def __initialise_client(self):
authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_)

if self.descriptor:
self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs)
else:
self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs)

@bq_retry
def dataset_exists(self, dataset):
"""Returns whether the given dataset exists.
If regional location is specified for the dataset, that is also checked
Expand All @@ -147,14 +181,14 @@ def dataset_exists(self, dataset):
raise Exception('''Dataset already exists with regional location {}. Can't use {}.'''.format(
fetched_location if fetched_location is not None else 'unspecified',
dataset.location))

except http.HttpError as ex:
if ex.resp.status == 404:
return False
raise

return True

@bq_retry
def table_exists(self, table):
"""Returns whether the given table exists.
Expand Down

0 comments on commit ad5ddc9

Please sign in to comment.