Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch BigQueryClient to retry on network error #3088

Merged
merged 4 commits into from
Oct 13, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,38 @@
import time
from luigi.contrib import gcp

from tenacity import retry
from tenacity import retry_if_exception
from tenacity import retry_if_exception_type
from tenacity import wait_exponential
from tenacity import stop_after_attempt

logger = logging.getLogger('luigi-interface')

RETRYABLE_ERRORS = None
try:
import httplib2
from googleapiclient import discovery
from googleapiclient import errors
from googleapiclient import http
except ImportError:
logger.warning('BigQuery module imported, but google-api-python-client is '
'not installed. Any BigQuery task will fail')
else:
RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError, TimeoutError, BrokenPipeError)


# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/
def is_error_5xx(err):
return isinstance(err, errors.HttpError) and err.resp.status >= 500


bq_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)),
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
after=lambda x: x.args[0].__initialise_client()
)


class CreateDisposition:
Expand Down Expand Up @@ -121,13 +145,23 @@ class BigQueryClient:
"""

def __init__(self, oauth_credentials=None, descriptor='', http_=None):
authenticate_kwargs = gcp.get_authenticate_kwargs(oauth_credentials, http_)
# Save initialisation arguments in case we need to re-create client
# due to connection timeout
self.oauth_credentials = oauth_credentials
self.descriptor = descriptor
self.http_ = http_

self.__initialise_client()

if descriptor:
self.client = discovery.build_from_document(descriptor, **authenticate_kwargs)
def __initialise_client(self):
authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_)

if self.descriptor:
self.client = discovery.build_from_document(self.descriptor, **authenticate_kwargs)
else:
self.client = discovery.build('bigquery', 'v2', cache_discovery=False, **authenticate_kwargs)

@bq_retry
def dataset_exists(self, dataset):
"""Returns whether the given dataset exists.
If regional location is specified for the dataset, that is also checked
Expand All @@ -146,14 +180,14 @@ def dataset_exists(self, dataset):
raise Exception('''Dataset already exists with regional location {}. Can't use {}.'''.format(
fetched_location if fetched_location is not None else 'unspecified',
dataset.location))

except http.HttpError as ex:
if ex.resp.status == 404:
return False
raise

return True

@bq_retry
def table_exists(self, table):
"""Returns whether the given table exists.

Expand Down