From 26dd7299ff6683628925d1fa843253e9bca26fa9 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 26 Dec 2019 23:12:44 +0000 Subject: [PATCH] feat(bigquery): add timeout parameter to client's and job's public methods (#10002) * Reverse argument order in job.result() This is for internal consistency with other methods such as reload(). * Add TODO reminder to _AsyncJob.cancel() method * Add timeout argument to public methods AN exception is the Client.load_table_from_file() method, and the methods that depend on it, because adding a timeout requires changes in the google-resumable-media dependency. * Explicitly test timeout parameter * Split timeout in multi-request methods If a method makes multiple requests and is given a timeout, that timeout should represent the total allowed time for all requests combined. * Fix minor styling issue * Add timeout with retry test for Client._call_api() --- bigquery/google/cloud/bigquery/client.py | 322 +++++++++++--- bigquery/google/cloud/bigquery/job.py | 112 +++-- bigquery/noxfile.py | 2 +- bigquery/tests/unit/test_client.py | 510 +++++++++++++++++++---- bigquery/tests/unit/test_job.py | 168 +++++++- 5 files changed, 910 insertions(+), 204 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 5fd7bceea973..5707d57cdb62 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -22,6 +22,7 @@ except ImportError: # Python 2.7 import collections as collections_abc +import concurrent.futures import copy import functools import gzip @@ -47,6 +48,7 @@ import google.api_core.client_options import google.api_core.exceptions from google.api_core import page_iterator +from google.auth.transport.requests import TimeoutGuard import google.cloud._helpers from google.cloud import exceptions from google.cloud.client import ClientWithProject @@ -206,7 +208,7 @@ def close(self): self._http._auth_request.session.close() self._http.close() - def get_service_account_email(self, project=None): + def get_service_account_email(self, project=None, timeout=None): """Get the email address of the project's BigQuery service account Note: @@ -217,6 +219,8 @@ def get_service_account_email(self, project=None): project (str, optional): Project ID to use for retreiving service account email. Defaults to the client's project. + timeout (Optional[float]): + The number of seconds to wait for the API response. Returns: str: service account email address @@ -232,10 +236,16 @@ def get_service_account_email(self, project=None): if project is None: project = self.project path = "/projects/%s/serviceAccount" % (project,) - api_response = self._connection.api_request(method="GET", path=path) + + # TODO: call thorugh self._call_api() and allow passing in a retry? + api_response = self._connection.api_request( + method="GET", path=path, timeout=timeout + ) return api_response["email"] - def list_projects(self, max_results=None, page_token=None, retry=DEFAULT_RETRY): + def list_projects( + self, max_results=None, page_token=None, retry=DEFAULT_RETRY, timeout=None + ): """List projects for the project associated with this client. See @@ -256,6 +266,10 @@ def list_projects(self, max_results=None, page_token=None, retry=DEFAULT_RETRY): retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: google.api_core.page_iterator.Iterator: Iterator of :class:`~google.cloud.bigquery.client.Project` @@ -263,7 +277,7 @@ def list_projects(self, max_results=None, page_token=None, retry=DEFAULT_RETRY): """ return page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path="/projects", item_to_value=_item_to_project, items_key="projects", @@ -279,6 +293,7 @@ def list_datasets( max_results=None, page_token=None, retry=DEFAULT_RETRY, + timeout=None, ): """List datasets for the project associated with this client. @@ -307,6 +322,9 @@ def list_datasets( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -325,7 +343,7 @@ def list_datasets( path = "/projects/%s/datasets" % (project,) return page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_dataset, items_key="datasets", @@ -366,7 +384,9 @@ def _create_bqstorage_client(self): credentials=self._credentials ) - def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): + def create_dataset( + self, dataset, exists_ok=False, retry=DEFAULT_RETRY, timeout=None + ): """API call: create the dataset via a POST request. See @@ -386,6 +406,9 @@ def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): errors when creating the dataset. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.dataset.Dataset: @@ -413,14 +436,18 @@ def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): data["location"] = self.location try: - api_response = self._call_api(retry, method="POST", path=path, data=data) + api_response = self._call_api( + retry, method="POST", path=path, data=data, timeout=timeout + ) return Dataset.from_api_repr(api_response) except google.api_core.exceptions.Conflict: if not exists_ok: raise return self.get_dataset(dataset.reference, retry=retry) - def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): + def create_routine( + self, routine, exists_ok=False, retry=DEFAULT_RETRY, timeout=None + ): """[Beta] Create a routine via a POST request. See @@ -435,6 +462,9 @@ def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): errors when creating the routine. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.routine.Routine: @@ -447,7 +477,7 @@ def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): resource = routine.to_api_repr() try: api_response = self._call_api( - retry, method="POST", path=path, data=resource + retry, method="POST", path=path, data=resource, timeout=timeout ) return Routine.from_api_repr(api_response) except google.api_core.exceptions.Conflict: @@ -455,7 +485,7 @@ def create_routine(self, routine, exists_ok=False, retry=DEFAULT_RETRY): raise return self.get_routine(routine.reference, retry=retry) - def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY): + def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY, timeout=None): """API call: create a table via a PUT request See @@ -476,6 +506,9 @@ def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY): errors when creating the table. retry (google.api_core.retry.Retry): Optional. How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.Table: @@ -486,7 +519,9 @@ def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY): path = "/projects/%s/datasets/%s/tables" % (table.project, table.dataset_id) data = table.to_api_repr() try: - api_response = self._call_api(retry, method="POST", path=path, data=data) + api_response = self._call_api( + retry, method="POST", path=path, data=data, timeout=timeout + ) return Table.from_api_repr(api_response) except google.api_core.exceptions.Conflict: if not exists_ok: @@ -499,7 +534,7 @@ def _call_api(self, retry, **kwargs): call = retry(call) return call() - def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY): + def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY, timeout=None): """Fetch the dataset referenced by ``dataset_ref`` Args: @@ -513,6 +548,9 @@ def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY): :func:`~google.cloud.bigquery.dataset.DatasetReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.dataset.Dataset: @@ -523,10 +561,12 @@ def get_dataset(self, dataset_ref, retry=DEFAULT_RETRY): dataset_ref, default_project=self.project ) - api_response = self._call_api(retry, method="GET", path=dataset_ref.path) + api_response = self._call_api( + retry, method="GET", path=dataset_ref.path, timeout=timeout + ) return Dataset.from_api_repr(api_response) - def get_model(self, model_ref, retry=DEFAULT_RETRY): + def get_model(self, model_ref, retry=DEFAULT_RETRY, timeout=None): """[Beta] Fetch the model referenced by ``model_ref``. Args: @@ -540,6 +580,9 @@ def get_model(self, model_ref, retry=DEFAULT_RETRY): :func:`google.cloud.bigquery.model.ModelReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.model.Model: A ``Model`` instance. @@ -549,10 +592,12 @@ def get_model(self, model_ref, retry=DEFAULT_RETRY): model_ref, default_project=self.project ) - api_response = self._call_api(retry, method="GET", path=model_ref.path) + api_response = self._call_api( + retry, method="GET", path=model_ref.path, timeout=timeout + ) return Model.from_api_repr(api_response) - def get_routine(self, routine_ref, retry=DEFAULT_RETRY): + def get_routine(self, routine_ref, retry=DEFAULT_RETRY, timeout=None): """[Beta] Get the routine referenced by ``routine_ref``. Args: @@ -567,6 +612,9 @@ def get_routine(self, routine_ref, retry=DEFAULT_RETRY): :func:`google.cloud.bigquery.routine.RoutineReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.routine.Routine: @@ -577,10 +625,12 @@ def get_routine(self, routine_ref, retry=DEFAULT_RETRY): routine_ref, default_project=self.project ) - api_response = self._call_api(retry, method="GET", path=routine_ref.path) + api_response = self._call_api( + retry, method="GET", path=routine_ref.path, timeout=timeout + ) return Routine.from_api_repr(api_response) - def get_table(self, table, retry=DEFAULT_RETRY): + def get_table(self, table, retry=DEFAULT_RETRY, timeout=None): """Fetch the table referenced by ``table``. Args: @@ -595,16 +645,21 @@ def get_table(self, table, retry=DEFAULT_RETRY): :func:`google.cloud.bigquery.table.TableReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.Table: A ``Table`` instance. """ table_ref = _table_arg_to_table_ref(table, default_project=self.project) - api_response = self._call_api(retry, method="GET", path=table_ref.path) + api_response = self._call_api( + retry, method="GET", path=table_ref.path, timeout=timeout + ) return Table.from_api_repr(api_response) - def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY): + def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY, timeout=None): """Change some fields of a dataset. Use ``fields`` to specify which fields to update. At least one field @@ -625,6 +680,9 @@ def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY): The properties of ``dataset`` to change (e.g. "friendly_name"). retry (google.api_core.retry.Retry, optional): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.dataset.Dataset: @@ -636,11 +694,16 @@ def update_dataset(self, dataset, fields, retry=DEFAULT_RETRY): else: headers = None api_response = self._call_api( - retry, method="PATCH", path=dataset.path, data=partial, headers=headers + retry, + method="PATCH", + path=dataset.path, + data=partial, + headers=headers, + timeout=timeout, ) return Dataset.from_api_repr(api_response) - def update_model(self, model, fields, retry=DEFAULT_RETRY): + def update_model(self, model, fields, retry=DEFAULT_RETRY, timeout=None): """[Beta] Change some fields of a model. Use ``fields`` to specify which fields to update. At least one field @@ -660,6 +723,9 @@ def update_model(self, model, fields, retry=DEFAULT_RETRY): properties (e.g. "friendly_name"). retry (google.api_core.retry.Retry): (Optional) A description of how to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.model.Model: @@ -671,11 +737,16 @@ def update_model(self, model, fields, retry=DEFAULT_RETRY): else: headers = None api_response = self._call_api( - retry, method="PATCH", path=model.path, data=partial, headers=headers + retry, + method="PATCH", + path=model.path, + data=partial, + headers=headers, + timeout=timeout, ) return Model.from_api_repr(api_response) - def update_routine(self, routine, fields, retry=DEFAULT_RETRY): + def update_routine(self, routine, fields, retry=DEFAULT_RETRY, timeout=None): """[Beta] Change some fields of a routine. Use ``fields`` to specify which fields to update. At least one field @@ -702,6 +773,9 @@ def update_routine(self, routine, fields, retry=DEFAULT_RETRY): (e.g. ``type_``). retry (google.api_core.retry.Retry): (Optional) A description of how to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.routine.Routine: @@ -717,11 +791,16 @@ def update_routine(self, routine, fields, retry=DEFAULT_RETRY): partial["routineReference"] = routine.reference.to_api_repr() api_response = self._call_api( - retry, method="PUT", path=routine.path, data=partial, headers=headers + retry, + method="PUT", + path=routine.path, + data=partial, + headers=headers, + timeout=timeout, ) return Routine.from_api_repr(api_response) - def update_table(self, table, fields, retry=DEFAULT_RETRY): + def update_table(self, table, fields, retry=DEFAULT_RETRY, timeout=None): """Change some fields of a table. Use ``fields`` to specify which fields to update. At least one field @@ -741,6 +820,9 @@ def update_table(self, table, fields, retry=DEFAULT_RETRY): properties (e.g. "friendly_name"). retry (google.api_core.retry.Retry): (Optional) A description of how to retry the API call. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.table.Table: @@ -752,12 +834,22 @@ def update_table(self, table, fields, retry=DEFAULT_RETRY): else: headers = None api_response = self._call_api( - retry, method="PATCH", path=table.path, data=partial, headers=headers + retry, + method="PATCH", + path=table.path, + data=partial, + headers=headers, + timeout=timeout, ) return Table.from_api_repr(api_response) def list_models( - self, dataset, max_results=None, page_token=None, retry=DEFAULT_RETRY + self, + dataset, + max_results=None, + page_token=None, + retry=DEFAULT_RETRY, + timeout=None, ): """[Beta] List models in the dataset. @@ -786,6 +878,9 @@ def list_models( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -804,7 +899,7 @@ def list_models( path = "%s/models" % dataset.path result = page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_model, items_key="models", @@ -815,7 +910,12 @@ def list_models( return result def list_routines( - self, dataset, max_results=None, page_token=None, retry=DEFAULT_RETRY + self, + dataset, + max_results=None, + page_token=None, + retry=DEFAULT_RETRY, + timeout=None, ): """[Beta] List routines in the dataset. @@ -844,6 +944,9 @@ def list_routines( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -862,7 +965,7 @@ def list_routines( path = "{}/routines".format(dataset.path) result = page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_routine, items_key="routines", @@ -873,7 +976,12 @@ def list_routines( return result def list_tables( - self, dataset, max_results=None, page_token=None, retry=DEFAULT_RETRY + self, + dataset, + max_results=None, + page_token=None, + retry=DEFAULT_RETRY, + timeout=None, ): """List tables in the dataset. @@ -902,6 +1010,9 @@ def list_tables( :class:`~google.api_core.page_iterator.HTTPIterator`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.api_core.page_iterator.Iterator: @@ -920,7 +1031,7 @@ def list_tables( path = "%s/tables" % dataset.path result = page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_table, items_key="tables", @@ -931,7 +1042,12 @@ def list_tables( return result def delete_dataset( - self, dataset, delete_contents=False, retry=DEFAULT_RETRY, not_found_ok=False + self, + dataset, + delete_contents=False, + retry=DEFAULT_RETRY, + timeout=None, + not_found_ok=False, ): """Delete a dataset. @@ -954,6 +1070,9 @@ def delete_dataset( Default is False. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the dataset. @@ -972,13 +1091,19 @@ def delete_dataset( try: self._call_api( - retry, method="DELETE", path=dataset.path, query_params=params + retry, + method="DELETE", + path=dataset.path, + query_params=params, + timeout=timeout, ) except google.api_core.exceptions.NotFound: if not not_found_ok: raise - def delete_model(self, model, retry=DEFAULT_RETRY, not_found_ok=False): + def delete_model( + self, model, retry=DEFAULT_RETRY, timeout=None, not_found_ok=False + ): """[Beta] Delete a model See @@ -996,6 +1121,9 @@ def delete_model(self, model, retry=DEFAULT_RETRY, not_found_ok=False): :func:`google.cloud.bigquery.model.ModelReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the model. @@ -1007,12 +1135,14 @@ def delete_model(self, model, retry=DEFAULT_RETRY, not_found_ok=False): raise TypeError("model must be a Model or a ModelReference") try: - self._call_api(retry, method="DELETE", path=model.path) + self._call_api(retry, method="DELETE", path=model.path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise - def delete_routine(self, routine, retry=DEFAULT_RETRY, not_found_ok=False): + def delete_routine( + self, routine, retry=DEFAULT_RETRY, timeout=None, not_found_ok=False + ): """[Beta] Delete a routine. See @@ -1030,6 +1160,9 @@ def delete_routine(self, routine, retry=DEFAULT_RETRY, not_found_ok=False): :func:`google.cloud.bigquery.routine.RoutineReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the routine. @@ -1043,12 +1176,14 @@ def delete_routine(self, routine, retry=DEFAULT_RETRY, not_found_ok=False): raise TypeError("routine must be a Routine or a RoutineReference") try: - self._call_api(retry, method="DELETE", path=routine.path) + self._call_api(retry, method="DELETE", path=routine.path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise - def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): + def delete_table( + self, table, retry=DEFAULT_RETRY, timeout=None, not_found_ok=False + ): """Delete a table See @@ -1066,6 +1201,9 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): :func:`google.cloud.bigquery.table.TableReference.from_string`. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. not_found_ok (bool): Defaults to ``False``. If ``True``, ignore "not found" errors when deleting the table. @@ -1075,7 +1213,7 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False): raise TypeError("Unable to get TableReference for table '{}'".format(table)) try: - self._call_api(retry, method="DELETE", path=table.path) + self._call_api(retry, method="DELETE", path=table.path, timeout=timeout) except google.api_core.exceptions.NotFound: if not not_found_ok: raise @@ -1098,7 +1236,7 @@ def _get_query_results( location (str): Location of the query job. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport - before retrying the HTTP request. + before using ``retry``. Returns: google.cloud.bigquery.query._QueryResults: @@ -1155,7 +1293,9 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) - def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): + def get_job( + self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None + ): """Fetch a job for the project associated with this client. See @@ -1171,6 +1311,9 @@ def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): location (str): Location where the job was run. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Union[ \ @@ -1195,12 +1338,14 @@ def get_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): path = "/projects/{}/jobs/{}".format(project, job_id) resource = self._call_api( - retry, method="GET", path=path, query_params=extra_params + retry, method="GET", path=path, query_params=extra_params, timeout=timeout ) return self.job_from_resource(resource) - def cancel_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): + def cancel_job( + self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None + ): """Attempt to cancel a job from a job ID. See @@ -1216,6 +1361,9 @@ def cancel_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): location (str): Location where the job was run. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Union[ \ @@ -1240,7 +1388,7 @@ def cancel_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY): path = "/projects/{}/jobs/{}/cancel".format(project, job_id) resource = self._call_api( - retry, method="POST", path=path, query_params=extra_params + retry, method="POST", path=path, query_params=extra_params, timeout=timeout ) return self.job_from_resource(resource["job"]) @@ -1254,6 +1402,7 @@ def list_jobs( all_users=None, state_filter=None, retry=DEFAULT_RETRY, + timeout=None, min_creation_time=None, max_creation_time=None, ): @@ -1290,6 +1439,9 @@ def list_jobs( * ``"running"`` retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. min_creation_time (Optional[datetime.datetime]): Min value for job creation time. If set, only jobs created after or at this timestamp are returned. If the datetime has @@ -1329,7 +1481,7 @@ def list_jobs( path = "/projects/%s/jobs" % (project,) return page_iterator.HTTPIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path=path, item_to_value=_item_to_job, items_key="jobs", @@ -1348,6 +1500,7 @@ def load_table_from_uri( project=None, job_config=None, retry=DEFAULT_RETRY, + timeout=None, ): """Starts a job for loading data into a table from CloudStorage. @@ -1384,6 +1537,9 @@ def load_table_from_uri( (Optional) Extra configuration options for the job. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -1413,7 +1569,7 @@ def load_table_from_uri( _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config) - load_job._begin(retry=retry) + load_job._begin(retry=retry, timeout=timeout) return load_job @@ -1918,6 +2074,7 @@ def copy_table( project=None, job_config=None, retry=DEFAULT_RETRY, + timeout=None, ): """Copy one or more tables to another table. @@ -1961,6 +2118,9 @@ def copy_table( (Optional) Extra configuration options for the job. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.CopyJob: A new copy job instance. @@ -2004,7 +2164,7 @@ def copy_table( copy_job = job.CopyJob( job_ref, sources, destination, client=self, job_config=job_config ) - copy_job._begin(retry=retry) + copy_job._begin(retry=retry, timeout=timeout) return copy_job @@ -2018,6 +2178,7 @@ def extract_table( project=None, job_config=None, retry=DEFAULT_RETRY, + timeout=None, ): """Start a job to extract a table into Cloud Storage files. @@ -2052,6 +2213,9 @@ def extract_table( (Optional) Extra configuration options for the job. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Args: source (google.cloud.bigquery.table.TableReference): table to be extracted. @@ -2086,7 +2250,7 @@ def extract_table( extract_job = job.ExtractJob( job_ref, source, destination_uris, client=self, job_config=job_config ) - extract_job._begin(retry=retry) + extract_job._begin(retry=retry, timeout=timeout) return extract_job @@ -2099,6 +2263,7 @@ def query( location=None, project=None, retry=DEFAULT_RETRY, + timeout=None, ): """Run a SQL query. @@ -2129,6 +2294,9 @@ def query( to the client's project. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.QueryJob: A new query job instance. @@ -2169,7 +2337,7 @@ def query( job_ref = job._JobReference(job_id, project=project, location=location) query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config) - query_job._begin(retry=retry) + query_job._begin(retry=retry, timeout=timeout) return query_job @@ -2290,6 +2458,7 @@ def insert_rows_json( ignore_unknown_values=None, template_suffix=None, retry=DEFAULT_RETRY, + timeout=None, ): """Insert rows into a table without applying local type conversions. @@ -2326,6 +2495,9 @@ def insert_rows_json( https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Sequence[Mappings]: @@ -2359,7 +2531,11 @@ def insert_rows_json( # We can always retry, because every row has an insert ID. response = self._call_api( - retry, method="POST", path="%s/insertAll" % table.path, data=data + retry, + method="POST", + path="%s/insertAll" % table.path, + data=data, + timeout=timeout, ) errors = [] @@ -2368,7 +2544,7 @@ def insert_rows_json( return errors - def list_partitions(self, table, retry=DEFAULT_RETRY): + def list_partitions(self, table, retry=DEFAULT_RETRY, timeout=None): """List the partitions in a table. Args: @@ -2380,23 +2556,37 @@ def list_partitions(self, table, retry=DEFAULT_RETRY): The table or reference from which to get partition info retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: List[str]: A list of the partition ids present in the partitioned table """ table = _table_arg_to_table_ref(table, default_project=self.project) - meta_table = self.get_table( - TableReference( - self.dataset(table.dataset_id, project=table.project), - "%s$__PARTITIONS_SUMMARY__" % table.table_id, + + with TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) as guard: + meta_table = self.get_table( + TableReference( + self.dataset(table.dataset_id, project=table.project), + "%s$__PARTITIONS_SUMMARY__" % table.table_id, + ), + retry=retry, + timeout=timeout, ) - ) + timeout = guard.remaining_timeout subset = [col for col in meta_table.schema if col.name == "partition_id"] return [ row[0] - for row in self.list_rows(meta_table, selected_fields=subset, retry=retry) + for row in self.list_rows( + meta_table, selected_fields=subset, retry=retry, timeout=timeout + ) ] def list_rows( @@ -2408,6 +2598,7 @@ def list_rows( start_index=None, page_size=None, retry=DEFAULT_RETRY, + timeout=None, ): """List the rows of the table. @@ -2452,6 +2643,11 @@ def list_rows( to a sensible value set by the API. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: google.cloud.bigquery.table.RowIterator: @@ -2476,7 +2672,11 @@ def list_rows( # No schema, but no selected_fields. Assume the developer wants all # columns, so get the table resource for them rather than failing. elif len(schema) == 0: - table = self.get_table(table.reference, retry=retry) + with TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) as guard: + table = self.get_table(table.reference, retry=retry, timeout=timeout) + timeout = guard.remaining_timeout schema = table.schema params = {} @@ -2487,7 +2687,7 @@ def list_rows( row_iterator = RowIterator( client=self, - api_request=functools.partial(self._call_api, retry), + api_request=functools.partial(self._call_api, retry, timeout=timeout), path="%s/data" % (table.path,), schema=schema, page_token=page_token, diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index d20e5b5fb11f..34628350c922 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -26,6 +26,7 @@ from six.moves import http_client import google.api_core.future.polling +from google.auth.transport.requests import TimeoutGuard from google.cloud import exceptions from google.cloud.exceptions import NotFound from google.cloud.bigquery.dataset import Dataset @@ -607,7 +608,7 @@ def to_api_repr(self): _build_resource = to_api_repr # backward-compatibility alias - def _begin(self, client=None, retry=DEFAULT_RETRY): + def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): """API call: begin the job via a POST request See @@ -619,6 +620,9 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): associated with the job object or``NoneType`` retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Raises: ValueError: @@ -633,11 +637,11 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): # jobs.insert is idempotent because we ensure that every new # job has an ID. api_response = client._call_api( - retry, method="POST", path=path, data=self.to_api_repr() + retry, method="POST", path=path, data=self.to_api_repr(), timeout=timeout ) self._set_properties(api_response) - def exists(self, client=None, retry=DEFAULT_RETRY): + def exists(self, client=None, retry=DEFAULT_RETRY, timeout=None): """API call: test for the existence of the job via a GET request See @@ -649,6 +653,9 @@ def exists(self, client=None, retry=DEFAULT_RETRY): ``client`` stored on the current dataset. retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: bool: Boolean indicating existence of the job. @@ -661,7 +668,11 @@ def exists(self, client=None, retry=DEFAULT_RETRY): try: client._call_api( - retry, method="GET", path=self.path, query_params=extra_params + retry, + method="GET", + path=self.path, + query_params=extra_params, + timeout=timeout, ) except NotFound: return False @@ -682,7 +693,7 @@ def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None): retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport - before retrying the HTTP request. + before using ``retry``. """ client = self._require_client(client) @@ -699,7 +710,7 @@ def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None): ) self._set_properties(api_response) - def cancel(self, client=None): + def cancel(self, client=None, timeout=None): """API call: cancel job via a POST request See @@ -709,6 +720,8 @@ def cancel(self, client=None): client (Optional[google.cloud.bigquery.client.Client]): the client to use. If not passed, falls back to the ``client`` stored on the current dataset. + timeout (Optional[float]): + The number of seconds to wait for the API response. Returns: bool: Boolean indicating that the cancel request was sent. @@ -719,8 +732,12 @@ def cancel(self, client=None): if self.location: extra_params["location"] = self.location + # TODO: call thorugh client._call_api() and allow passing in a retry? api_response = client._connection.api_request( - method="POST", path="%s/cancel" % (self.path,), query_params=extra_params + method="POST", + path="%s/cancel" % (self.path,), + query_params=extra_params, + timeout=timeout, ) self._set_properties(api_response["job"]) # The Future interface requires that we return True if the *attempt* @@ -751,11 +768,14 @@ def _set_future_result(self): else: self.set_result(self) - def done(self, retry=DEFAULT_RETRY): + def done(self, retry=DEFAULT_RETRY, timeout=None): """Refresh the job and checks if it is complete. Args: retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: bool: True if the job is complete, False otherwise. @@ -763,18 +783,19 @@ def done(self, retry=DEFAULT_RETRY): # Do not refresh is the state is already done, as the job will not # change once complete. if self.state != _DONE_STATE: - self.reload(retry=retry) + self.reload(retry=retry, timeout=timeout) return self.state == _DONE_STATE - def result(self, timeout=None, retry=DEFAULT_RETRY): + def result(self, retry=DEFAULT_RETRY, timeout=None): """Start the job and wait for it to complete and get the result. Args: - timeout (float): - How long (in seconds) to wait for job to complete before raising - a :class:`concurrent.futures.TimeoutError`. - retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: _AsyncJob: This instance. @@ -786,7 +807,11 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): if the job did not complete in the given timeout. """ if self.state is None: - self._begin(retry=retry) + with TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) as guard: + self._begin(retry=retry, timeout=timeout) + timeout = guard.remaining_timeout # TODO: modify PollingFuture so it can pass a retry argument to done(). return super(_AsyncJob, self).result(timeout=timeout) @@ -3014,7 +3039,7 @@ def done(self, retry=DEFAULT_RETRY, timeout=None): How to retry the call that retrieves query results. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport - before retrying the HTTP request. + before using ``retry``. Returns: bool: True if the job is complete, False otherwise. @@ -3100,7 +3125,7 @@ def _format_for_exception(query, job_id): return template.format(job_id=job_id, header=header, ruler=ruler, body=body) - def _begin(self, client=None, retry=DEFAULT_RETRY): + def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): """API call: begin the job via a POST request See @@ -3112,32 +3137,39 @@ def _begin(self, client=None, retry=DEFAULT_RETRY): associated with the job object or``NoneType``. retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Raises: ValueError: If the job has already begun. """ try: - super(QueryJob, self)._begin(client=client, retry=retry) + super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout) except exceptions.GoogleCloudError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self raise def result( - self, timeout=None, page_size=None, retry=DEFAULT_RETRY, max_results=None + self, page_size=None, max_results=None, retry=DEFAULT_RETRY, timeout=None ): """Start the job and wait for it to complete and get the result. Args: - timeout (float): - How long (in seconds) to wait for job to complete before - raising a :class:`concurrent.futures.TimeoutError`. - page_size (int): - (Optional) The maximum number of rows in each page of results - from this request. Non-positive values are ignored. - retry (google.api_core.retry.Retry): - (Optional) How to retry the call that retrieves rows. + page_size (Optional[int]): + The maximum number of rows in each page of results from this + request. Non-positive values are ignored. + max_results (Optional[int]): + The maximum total number of rows from this request. + retry (Optional[google.api_core.retry.Retry]): + How to retry the call that retrieves rows. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + If multiple requests are made under the hood, ``timeout`` is + interpreted as the approximate total time of **all** requests. Returns: google.cloud.bigquery.table.RowIterator: @@ -3155,13 +3187,27 @@ def result( If the job did not complete in the given timeout. """ try: - super(QueryJob, self).result(timeout=timeout) + guard = TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError + ) + with guard: + super(QueryJob, self).result(retry=retry, timeout=timeout) + timeout = guard.remaining_timeout # Return an iterator instead of returning the job. if not self._query_results: - self._query_results = self._client._get_query_results( - self.job_id, retry, project=self.project, location=self.location + guard = TimeoutGuard( + timeout, timeout_error_type=concurrent.futures.TimeoutError ) + with guard: + self._query_results = self._client._get_query_results( + self.job_id, + retry, + project=self.project, + location=self.location, + timeout=timeout, + ) + timeout = guard.remaining_timeout except exceptions.GoogleCloudError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self @@ -3181,7 +3227,11 @@ def result( dest_table = Table(dest_table_ref, schema=schema) dest_table._properties["numRows"] = self._query_results.total_rows rows = self._client.list_rows( - dest_table, page_size=page_size, retry=retry, max_results=max_results + dest_table, + page_size=page_size, + max_results=max_results, + retry=retry, + timeout=timeout, ) rows._preserve_order = _contains_order_by(self.query) return rows diff --git a/bigquery/noxfile.py b/bigquery/noxfile.py index 8c041fa6a178..69b96b3dc984 100644 --- a/bigquery/noxfile.py +++ b/bigquery/noxfile.py @@ -34,7 +34,7 @@ def default(session): run the tests. """ # Install all test dependencies, then install local packages in-place. - session.install("mock", "pytest", "pytest-cov") + session.install("mock", "pytest", "pytest-cov", "freezegun") for local_dep in LOCAL_DEPS: session.install("-e", local_dep) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index c9166bd5d7c0..a5100fe6eaef 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -24,6 +24,7 @@ import unittest import warnings +import freezegun import mock import requests import six @@ -212,6 +213,28 @@ def test_ctor_w_query_job_config(self): self.assertIsInstance(client._default_query_job_config, QueryJobConfig) self.assertTrue(client._default_query_job_config.dry_run) + def test__call_api_applying_custom_retry_on_timeout(self): + from concurrent.futures import TimeoutError + from google.cloud.bigquery.retry import DEFAULT_RETRY + + client = self._make_one() + + api_request_patcher = mock.patch.object( + client._connection, "api_request", side_effect=[TimeoutError, "result"], + ) + retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + lambda exc: isinstance(exc, TimeoutError) + ) + + with api_request_patcher as fake_api_request: + result = client._call_api(retry, foo="bar") + + self.assertEqual(result, "result") + self.assertEqual( + fake_api_request.call_args_list, + [mock.call(foo="bar"), mock.call(foo="bar")], # was retried once + ) + def test__get_query_results_miss_w_explicit_project_and_timeout(self): from google.cloud.exceptions import NotFound @@ -288,9 +311,9 @@ def test_get_service_account_email(self): resource = {"kind": "bigquery#getServiceAccountResponse", "email": email} conn = client._connection = make_connection(resource) - service_account_email = client.get_service_account_email() + service_account_email = client.get_service_account_email(timeout=7.5) - conn.api_request.assert_called_once_with(method="GET", path=path) + conn.api_request.assert_called_once_with(method="GET", path=path, timeout=7.5) self.assertEqual(service_account_email, email) def test_get_service_account_email_w_alternate_project(self): @@ -305,7 +328,7 @@ def test_get_service_account_email_w_alternate_project(self): service_account_email = client.get_service_account_email(project=project) - conn.api_request.assert_called_once_with(method="GET", path=path) + conn.api_request.assert_called_once_with(method="GET", path=path, timeout=None) self.assertEqual(service_account_email, email) def test_list_projects_defaults(self): @@ -351,7 +374,25 @@ def test_list_projects_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/projects", query_params={} + method="GET", path="/projects", query_params={}, timeout=None + ) + + def test_list_projects_w_timeout(self): + PROJECT_1 = "PROJECT_ONE" + TOKEN = "TOKEN" + DATA = { + "nextPageToken": TOKEN, + "projects": [], + } + creds = _make_credentials() + client = self._make_one(PROJECT_1, creds) + conn = client._connection = make_connection(DATA) + + iterator = client.list_projects(timeout=7.5) + six.next(iterator.pages) + + conn.api_request.assert_called_once_with( + method="GET", path="/projects", query_params={}, timeout=7.5 ) def test_list_projects_explicit_response_missing_projects_key(self): @@ -373,6 +414,7 @@ def test_list_projects_explicit_response_missing_projects_key(self): method="GET", path="/projects", query_params={"maxResults": 3, "pageToken": TOKEN}, + timeout=None, ) def test_list_datasets_defaults(self): @@ -422,18 +464,21 @@ def test_list_datasets_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) - def test_list_datasets_w_project(self): + def test_list_datasets_w_project_and_timeout(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection({}) - list(client.list_datasets(project="other-project")) + list(client.list_datasets(project="other-project", timeout=7.5)) conn.api_request.assert_called_once_with( - method="GET", path="/projects/other-project/datasets", query_params={} + method="GET", + path="/projects/other-project/datasets", + query_params={}, + timeout=7.5, ) def test_list_datasets_explicit_response_missing_datasets_key(self): @@ -464,6 +509,7 @@ def test_list_datasets_explicit_response_missing_datasets_key(self): "maxResults": 3, "pageToken": TOKEN, }, + timeout=None, ) def test_dataset_with_specified_project(self): @@ -502,9 +548,11 @@ def test_get_dataset(self): conn = client._connection = make_connection(resource) dataset_ref = client.dataset(self.DS_ID) - dataset = client.get_dataset(dataset_ref) + dataset = client.get_dataset(dataset_ref, timeout=7.5) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=7.5 + ) self.assertEqual(dataset.dataset_id, self.DS_ID) # Test retry. @@ -579,7 +627,7 @@ def test_create_dataset_minimal(self): ds_ref = client.dataset(self.DS_ID) before = Dataset(ds_ref) - after = client.create_dataset(before) + after = client.create_dataset(before, timeout=7.5) self.assertEqual(after.dataset_id, self.DS_ID) self.assertEqual(after.project, self.PROJECT) @@ -596,6 +644,7 @@ def test_create_dataset_minimal(self): }, "labels": {}, }, + timeout=7.5, ) def test_create_dataset_w_attrs(self): @@ -670,6 +719,7 @@ def test_create_dataset_w_attrs(self): ], "labels": LABELS, }, + timeout=None, ) def test_create_dataset_w_custom_property(self): @@ -707,6 +757,7 @@ def test_create_dataset_w_custom_property(self): "newAlphaProperty": "unreleased property", "labels": {}, }, + timeout=None, ) def test_create_dataset_w_client_location_wo_dataset_location(self): @@ -747,6 +798,7 @@ def test_create_dataset_w_client_location_wo_dataset_location(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_w_client_location_w_dataset_location(self): @@ -789,6 +841,7 @@ def test_create_dataset_w_client_location_w_dataset_location(self): "labels": {}, "location": OTHER_LOCATION, }, + timeout=None, ) def test_create_dataset_w_reference(self): @@ -824,6 +877,7 @@ def test_create_dataset_w_reference(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_w_fully_qualified_string(self): @@ -859,6 +913,7 @@ def test_create_dataset_w_fully_qualified_string(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_w_string(self): @@ -894,6 +949,7 @@ def test_create_dataset_w_string(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ) def test_create_dataset_alreadyexists_w_exists_ok_false(self): @@ -946,8 +1002,9 @@ def test_create_dataset_alreadyexists_w_exists_ok_true(self): "labels": {}, "location": self.LOCATION, }, + timeout=None, ), - mock.call(method="GET", path=get_path), + mock.call(method="GET", path=get_path, timeout=None), ] ) @@ -968,12 +1025,13 @@ def test_create_routine_w_minimal_resource(self): full_routine_id = "test-routine-project.test_routines.minimal_routine" routine = Routine(full_routine_id) - actual_routine = client.create_routine(routine) + actual_routine = client.create_routine(routine, timeout=7.5) conn.api_request.assert_called_once_with( method="POST", path="/projects/test-routine-project/datasets/test_routines/routines", data=resource, + timeout=7.5, ) self.assertEqual( actual_routine.reference, RoutineReference.from_string(full_routine_id) @@ -1004,6 +1062,7 @@ def test_create_routine_w_conflict(self): method="POST", path="/projects/test-routine-project/datasets/test_routines/routines", data=resource, + timeout=None, ) def test_create_routine_w_conflict_exists_ok(self): @@ -1035,10 +1094,12 @@ def test_create_routine_w_conflict_exists_ok(self): method="POST", path="/projects/test-routine-project/datasets/test_routines/routines", data=resource, + timeout=None, ), mock.call( method="GET", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", + timeout=None, ), ] ) @@ -1055,7 +1116,7 @@ def test_create_table_w_day_partition(self): table = Table(self.TABLE_REF) table.time_partitioning = TimePartitioning() - got = client.create_table(table) + got = client.create_table(table, timeout=7.5) conn.api_request.assert_called_once_with( method="POST", @@ -1069,6 +1130,7 @@ def test_create_table_w_day_partition(self): "timePartitioning": {"type": "DAY"}, "labels": {}, }, + timeout=7.5, ) self.assertEqual(table.time_partitioning.type_, "DAY") self.assertEqual(got.table_id, self.TABLE_ID) @@ -1101,6 +1163,7 @@ def test_create_table_w_custom_property(self): "newAlphaProperty": "unreleased property", "labels": {}, }, + timeout=None, ) self.assertEqual(got._properties["newAlphaProperty"], "unreleased property") self.assertEqual(got.table_id, self.TABLE_ID) @@ -1135,6 +1198,7 @@ def test_create_table_w_encryption_configuration(self): "labels": {}, "encryptionConfiguration": {"kmsKeyName": self.KMS_KEY_NAME}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1164,6 +1228,7 @@ def test_create_table_w_day_partition_and_expire(self): "timePartitioning": {"type": "DAY", "expirationMs": "100"}, "labels": {}, }, + timeout=None, ) self.assertEqual(table.time_partitioning.type_, "DAY") self.assertEqual(table.time_partitioning.expiration_ms, 100) @@ -1237,6 +1302,7 @@ def test_create_table_w_schema_and_query(self): "view": {"query": query, "useLegacySql": False}, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) self.assertEqual(got.project, self.PROJECT) @@ -1284,6 +1350,7 @@ def test_create_table_w_external(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) self.assertEqual(got.project, self.PROJECT) @@ -1313,6 +1380,7 @@ def test_create_table_w_reference(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1338,6 +1406,7 @@ def test_create_table_w_fully_qualified_string(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1361,6 +1430,7 @@ def test_create_table_w_string(self): }, "labels": {}, }, + timeout=None, ) self.assertEqual(got.table_id, self.TABLE_ID) @@ -1388,6 +1458,7 @@ def test_create_table_alreadyexists_w_exists_ok_false(self): }, "labels": {}, }, + timeout=None, ) def test_create_table_alreadyexists_w_exists_ok_true(self): @@ -1425,8 +1496,9 @@ def test_create_table_alreadyexists_w_exists_ok_true(self): }, "labels": {}, }, + timeout=None, ), - mock.call(method="GET", path=get_path), + mock.call(method="GET", path=get_path, timeout=None), ] ) @@ -1460,9 +1532,11 @@ def test_get_model(self): conn = client._connection = make_connection(resource) model_ref = client.dataset(self.DS_ID).model(self.MODEL_ID) - got = client.get_model(model_ref) + got = client.get_model(model_ref, timeout=7.5) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=7.5 + ) self.assertEqual(got.model_id, self.MODEL_ID) def test_get_model_w_string(self): @@ -1486,7 +1560,9 @@ def test_get_model_w_string(self): model_id = "{}.{}.{}".format(self.PROJECT, self.DS_ID, self.MODEL_ID) got = client.get_model(model_id) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=None + ) self.assertEqual(got.model_id, self.MODEL_ID) def test_get_routine(self): @@ -1513,11 +1589,12 @@ def test_get_routine(self): client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection(resource) - actual_routine = client.get_routine(routine) + actual_routine = client.get_routine(routine, timeout=7.5) conn.api_request.assert_called_once_with( method="GET", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", + timeout=7.5, ) self.assertEqual( actual_routine.reference, @@ -1546,9 +1623,11 @@ def test_get_table(self): client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) resource = self._make_table_resource() conn = client._connection = make_connection(resource) - table = client.get_table(self.TABLE_REF) + table = client.get_table(self.TABLE_REF, timeout=7.5) - conn.api_request.assert_called_once_with(method="GET", path="/%s" % path) + conn.api_request.assert_called_once_with( + method="GET", path="/%s" % path, timeout=7.5 + ) self.assertEqual(table.table_id, self.TABLE_ID) def test_get_table_sets_user_agent(self): @@ -1623,7 +1702,9 @@ def test_update_dataset(self): ds.labels = LABELS ds.access_entries = [AccessEntry("OWNER", "userByEmail", "phred@example.com")] ds2 = client.update_dataset( - ds, ["description", "friendly_name", "location", "labels", "access_entries"] + ds, + ["description", "friendly_name", "location", "labels", "access_entries"], + timeout=7.5, ) conn.api_request.assert_called_once_with( method="PATCH", @@ -1636,6 +1717,7 @@ def test_update_dataset(self): }, path="/" + PATH, headers=None, + timeout=7.5, ) self.assertEqual(ds2.description, ds.description) self.assertEqual(ds2.friendly_name, ds.friendly_name) @@ -1671,6 +1753,7 @@ def test_update_dataset_w_custom_property(self): data={"newAlphaProperty": "unreleased property"}, path=path, headers=None, + timeout=None, ) self.assertEqual(dataset.dataset_id, self.DS_ID) @@ -1713,7 +1796,7 @@ def test_update_model(self): model.labels = {"x": "y"} updated_model = client.update_model( - model, ["description", "friendly_name", "labels", "expires"] + model, ["description", "friendly_name", "labels", "expires"], timeout=7.5 ) sent = { @@ -1723,7 +1806,7 @@ def test_update_model(self): "labels": {"x": "y"}, } conn.api_request.assert_called_once_with( - method="PATCH", data=sent, path="/" + path, headers=None + method="PATCH", data=sent, path="/" + path, headers=None, timeout=7.5 ) self.assertEqual(updated_model.model_id, model.model_id) self.assertEqual(updated_model.description, model.description) @@ -1775,6 +1858,7 @@ def test_update_routine(self): actual_routine = client.update_routine( routine, ["arguments", "language", "body", "type_", "return_type", "someNewField"], + timeout=7.5, ) # TODO: routineReference isn't needed when the Routines API supports @@ -1785,6 +1869,7 @@ def test_update_routine(self): data=sent, path="/projects/routines-project/datasets/test_routines/routines/updated_routine", headers=None, + timeout=7.5, ) self.assertEqual(actual_routine.arguments, routine.arguments) self.assertEqual(actual_routine.body, routine.body) @@ -1846,7 +1931,7 @@ def test_update_table(self): table.labels = {"x": "y"} updated_table = client.update_table( - table, ["schema", "description", "friendly_name", "labels"] + table, ["schema", "description", "friendly_name", "labels"], timeout=7.5 ) sent = { @@ -1871,7 +1956,7 @@ def test_update_table(self): "labels": {"x": "y"}, } conn.api_request.assert_called_once_with( - method="PATCH", data=sent, path="/" + path, headers=None + method="PATCH", data=sent, path="/" + path, headers=None, timeout=7.5 ) self.assertEqual(updated_table.description, table.description) self.assertEqual(updated_table.friendly_name, table.friendly_name) @@ -1907,6 +1992,7 @@ def test_update_table_w_custom_property(self): path="/%s" % path, data={"newAlphaProperty": "unreleased property"}, headers=None, + timeout=None, ) self.assertEqual( updated_table._properties["newAlphaProperty"], "unreleased property" @@ -1935,6 +2021,7 @@ def test_update_table_only_use_legacy_sql(self): path="/%s" % path, data={"view": {"useLegacySql": True}}, headers=None, + timeout=None, ) self.assertEqual(updated_table.view_use_legacy_sql, table.view_use_legacy_sql) @@ -2008,6 +2095,7 @@ def test_update_table_w_query(self): "schema": schema_resource, }, headers=None, + timeout=None, ) def test_update_table_w_schema_None(self): @@ -2086,14 +2174,14 @@ def test_update_table_delete_property(self): self.assertEqual(req[1]["data"], sent) self.assertIsNone(table3.description) - def test_list_tables_empty(self): + def test_list_tables_empty_w_timeout(self): path = "/projects/{}/datasets/{}/tables".format(self.PROJECT, self.DS_ID) creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection({}) dataset = client.dataset(self.DS_ID) - iterator = client.list_tables(dataset) + iterator = client.list_tables(dataset, timeout=7.5) self.assertIs(iterator.dataset, dataset) page = six.next(iterator.pages) tables = list(page) @@ -2102,17 +2190,17 @@ def test_list_tables_empty(self): self.assertEqual(tables, []) self.assertIsNone(token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={} + method="GET", path=path, query_params={}, timeout=7.5 ) - def test_list_models_empty(self): + def test_list_models_empty_w_timeout(self): path = "/projects/{}/datasets/{}/models".format(self.PROJECT, self.DS_ID) creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection({}) dataset_id = "{}.{}".format(self.PROJECT, self.DS_ID) - iterator = client.list_models(dataset_id) + iterator = client.list_models(dataset_id, timeout=7.5) page = six.next(iterator.pages) models = list(page) token = iterator.next_page_token @@ -2120,7 +2208,7 @@ def test_list_models_empty(self): self.assertEqual(models, []) self.assertIsNone(token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={} + method="GET", path=path, query_params={}, timeout=7.5 ) def test_list_models_defaults(self): @@ -2168,7 +2256,7 @@ def test_list_models_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_models_wrong_type(self): @@ -2177,12 +2265,12 @@ def test_list_models_wrong_type(self): with self.assertRaises(TypeError): client.list_models(client.dataset(self.DS_ID).model("foo")) - def test_list_routines_empty(self): + def test_list_routines_empty_w_timeout(self): creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection({}) - iterator = client.list_routines("test-routines.test_routines") + iterator = client.list_routines("test-routines.test_routines", timeout=7.5) page = six.next(iterator.pages) routines = list(page) token = iterator.next_page_token @@ -2193,6 +2281,7 @@ def test_list_routines_empty(self): method="GET", path="/projects/test-routines/datasets/test_routines/routines", query_params={}, + timeout=7.5, ) def test_list_routines_defaults(self): @@ -2244,7 +2333,7 @@ def test_list_routines_defaults(self): self.assertEqual(actual_token, token) conn.api_request.assert_called_once_with( - method="GET", path=path, query_params={} + method="GET", path=path, query_params={}, timeout=None ) def test_list_routines_wrong_type(self): @@ -2305,7 +2394,7 @@ def test_list_tables_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_tables_explicit(self): @@ -2367,6 +2456,7 @@ def test_list_tables_explicit(self): method="GET", path="/%s" % PATH, query_params={"maxResults": 3, "pageToken": TOKEN}, + timeout=None, ) def test_list_tables_wrong_type(self): @@ -2386,9 +2476,9 @@ def test_delete_dataset(self): client = self._make_one(project=self.PROJECT, credentials=creds) conn = client._connection = make_connection(*([{}] * len(datasets))) for arg in datasets: - client.delete_dataset(arg) + client.delete_dataset(arg, timeout=7.5) conn.api_request.assert_called_with( - method="DELETE", path="/%s" % PATH, query_params={} + method="DELETE", path="/%s" % PATH, query_params={}, timeout=7.5 ) def test_delete_dataset_delete_contents(self): @@ -2405,6 +2495,7 @@ def test_delete_dataset_delete_contents(self): method="DELETE", path="/%s" % PATH, query_params={"deleteContents": "true"}, + timeout=None, ) def test_delete_dataset_wrong_type(self): @@ -2425,7 +2516,9 @@ def test_delete_dataset_w_not_found_ok_false(self): with self.assertRaises(google.api_core.exceptions.NotFound): client.delete_dataset(self.DS_ID) - conn.api_request.assert_called_with(method="DELETE", path=path, query_params={}) + conn.api_request.assert_called_with( + method="DELETE", path=path, query_params={}, timeout=None + ) def test_delete_dataset_w_not_found_ok_true(self): path = "/projects/{}/datasets/{}".format(self.PROJECT, self.DS_ID) @@ -2438,7 +2531,9 @@ def test_delete_dataset_w_not_found_ok_true(self): client.delete_dataset(self.DS_ID, not_found_ok=True) - conn.api_request.assert_called_with(method="DELETE", path=path, query_params={}) + conn.api_request.assert_called_with( + method="DELETE", path=path, query_params={}, timeout=None + ) def test_delete_model(self): from google.cloud.bigquery.model import Model @@ -2460,8 +2555,10 @@ def test_delete_model(self): conn = client._connection = make_connection(*([{}] * len(models))) for arg in models: - client.delete_model(arg) - conn.api_request.assert_called_with(method="DELETE", path="/%s" % path) + client.delete_model(arg, timeout=7.5) + conn.api_request.assert_called_with( + method="DELETE", path="/%s" % path, timeout=7.5 + ) def test_delete_model_w_wrong_type(self): creds = _make_credentials() @@ -2483,7 +2580,7 @@ def test_delete_model_w_not_found_ok_false(self): with self.assertRaises(google.api_core.exceptions.NotFound): client.delete_model("{}.{}".format(self.DS_ID, self.MODEL_ID)) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_delete_model_w_not_found_ok_true(self): path = "/projects/{}/datasets/{}/models/{}".format( @@ -2500,7 +2597,7 @@ def test_delete_model_w_not_found_ok_true(self): "{}.{}".format(self.DS_ID, self.MODEL_ID), not_found_ok=True ) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_delete_routine(self): from google.cloud.bigquery.routine import Routine @@ -2518,10 +2615,11 @@ def test_delete_routine(self): conn = client._connection = make_connection(*([{}] * len(routines))) for routine in routines: - client.delete_routine(routine) + client.delete_routine(routine, timeout=7.5) conn.api_request.assert_called_with( method="DELETE", path="/projects/test-routine-project/datasets/test_routines/routines/minimal_routine", + timeout=7.5, ) def test_delete_routine_w_wrong_type(self): @@ -2544,6 +2642,7 @@ def test_delete_routine_w_not_found_ok_false(self): conn.api_request.assert_called_with( method="DELETE", path="/projects/routines-project/datasets/test_routines/routines/test_routine", + timeout=None, ) def test_delete_routine_w_not_found_ok_true(self): @@ -2561,6 +2660,7 @@ def test_delete_routine_w_not_found_ok_true(self): conn.api_request.assert_called_with( method="DELETE", path="/projects/routines-project/datasets/test_routines/routines/test_routine", + timeout=None, ) def test_delete_table(self): @@ -2586,8 +2686,10 @@ def test_delete_table(self): conn = client._connection = make_connection(*([{}] * len(tables))) for arg in tables: - client.delete_table(arg) - conn.api_request.assert_called_with(method="DELETE", path="/%s" % path) + client.delete_table(arg, timeout=7.5) + conn.api_request.assert_called_with( + method="DELETE", path="/%s" % path, timeout=7.5 + ) def test_delete_table_w_wrong_type(self): creds = _make_credentials() @@ -2609,7 +2711,7 @@ def test_delete_table_w_not_found_ok_false(self): with self.assertRaises(google.api_core.exceptions.NotFound): client.delete_table("{}.{}".format(self.DS_ID, self.TABLE_ID)) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_delete_table_w_not_found_ok_true(self): path = "/projects/{}/datasets/{}/tables/{}".format( @@ -2626,7 +2728,7 @@ def test_delete_table_w_not_found_ok_true(self): "{}.{}".format(self.DS_ID, self.TABLE_ID), not_found_ok=True ) - conn.api_request.assert_called_with(method="DELETE", path=path) + conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) def test_job_from_resource_unknown_type(self): from google.cloud.bigquery.job import UnknownJob @@ -2653,6 +2755,7 @@ def test_get_job_miss_w_explict_project(self): method="GET", path="/projects/OTHER_PROJECT/jobs/NONESUCH", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) def test_get_job_miss_w_client_location(self): @@ -2671,9 +2774,10 @@ def test_get_job_miss_w_client_location(self): method="GET", path="/projects/OTHER_PROJECT/jobs/NONESUCH", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) - def test_get_job_hit(self): + def test_get_job_hit_w_timeout(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import QueryJob from google.cloud.bigquery.job import WriteDisposition @@ -2702,7 +2806,7 @@ def test_get_job_hit(self): client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection(ASYNC_QUERY_DATA) - job = client.get_job(JOB_ID) + job = client.get_job(JOB_ID, timeout=7.5) self.assertIsInstance(job, QueryJob) self.assertEqual(job.job_id, JOB_ID) @@ -2713,6 +2817,7 @@ def test_get_job_hit(self): method="GET", path="/projects/PROJECT/jobs/query_job", query_params={"projection": "full"}, + timeout=7.5, ) def test_cancel_job_miss_w_explict_project(self): @@ -2731,6 +2836,7 @@ def test_cancel_job_miss_w_explict_project(self): method="POST", path="/projects/OTHER_PROJECT/jobs/NONESUCH/cancel", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) def test_cancel_job_miss_w_client_location(self): @@ -2749,6 +2855,7 @@ def test_cancel_job_miss_w_client_location(self): method="POST", path="/projects/OTHER_PROJECT/jobs/NONESUCH/cancel", query_params={"projection": "full", "location": self.LOCATION}, + timeout=None, ) def test_cancel_job_hit(self): @@ -2777,6 +2884,31 @@ def test_cancel_job_hit(self): method="POST", path="/projects/PROJECT/jobs/query_job/cancel", query_params={"projection": "full"}, + timeout=None, + ) + + def test_cancel_job_w_timeout(self): + JOB_ID = "query_job" + QUERY = "SELECT * from test_dataset:test_table" + QUERY_JOB_RESOURCE = { + "id": "{}:{}".format(self.PROJECT, JOB_ID), + "jobReference": {"projectId": self.PROJECT, "jobId": "query_job"}, + "state": "RUNNING", + "configuration": {"query": {"query": QUERY}}, + } + RESOURCE = {"job": QUERY_JOB_RESOURCE} + + creds = _make_credentials() + client = self._make_one(self.PROJECT, creds) + conn = client._connection = make_connection(RESOURCE) + + client.cancel_job(JOB_ID, timeout=7.5) + + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/{}/jobs/query_job/cancel".format(self.PROJECT), + query_params={"projection": "full"}, + timeout=7.5, ) def test_list_jobs_defaults(self): @@ -2890,7 +3022,10 @@ def test_list_jobs_defaults(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={"projection": "full"} + method="GET", + path="/%s" % PATH, + query_params={"projection": "full"}, + timeout=None, ) def test_list_jobs_load_job_wo_sourceUris(self): @@ -2932,7 +3067,10 @@ def test_list_jobs_load_job_wo_sourceUris(self): self.assertEqual(token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={"projection": "full"} + method="GET", + path="/%s" % PATH, + query_params={"projection": "full"}, + timeout=None, ) def test_list_jobs_explicit_missing(self): @@ -2963,6 +3101,7 @@ def test_list_jobs_explicit_missing(self): "allUsers": True, "stateFilter": "done", }, + timeout=None, ) def test_list_jobs_w_project(self): @@ -2976,6 +3115,21 @@ def test_list_jobs_w_project(self): method="GET", path="/projects/other-project/jobs", query_params={"projection": "full"}, + timeout=None, + ) + + def test_list_jobs_w_timeout(self): + creds = _make_credentials() + client = self._make_one(self.PROJECT, creds) + conn = client._connection = make_connection({}) + + list(client.list_jobs(timeout=7.5)) + + conn.api_request.assert_called_once_with( + method="GET", + path="/projects/{}/jobs".format(self.PROJECT), + query_params={"projection": "full"}, + timeout=7.5, ) def test_list_jobs_w_time_filter(self): @@ -2999,6 +3153,7 @@ def test_list_jobs_w_time_filter(self): "minCreationTime": "1", "maxCreationTime": str(end_time_millis), }, + timeout=None, ) def test_list_jobs_w_parent_job_filter(self): @@ -3016,6 +3171,7 @@ def test_list_jobs_w_parent_job_filter(self): method="GET", path="/projects/%s/jobs" % self.PROJECT, query_params={"projection": "full", "parentJobId": "parent-job-123"}, + timeout=None, ) conn.api_request.reset_mock() @@ -3048,12 +3204,15 @@ def test_load_table_from_uri(self): destination = client.dataset(self.DS_ID).table(DESTINATION) job = client.load_table_from_uri( - SOURCE_URI, destination, job_id=JOB, job_config=job_config + SOURCE_URI, destination, job_id=JOB, job_config=job_config, timeout=7.5 ) # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=7.5, ) # the original config object should not have been modified @@ -3112,7 +3271,10 @@ def test_load_table_from_uri_w_explicit_project(self): # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_load_table_from_uri_w_client_location(self): @@ -3153,7 +3315,10 @@ def test_load_table_from_uri_w_client_location(self): # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_load_table_from_uri_w_invalid_job_config(self): @@ -3354,11 +3519,14 @@ def test_copy_table(self): source = dataset.table(SOURCE) destination = dataset.table(DESTINATION) - job = client.copy_table(source, destination, job_id=JOB) + job = client.copy_table(source, destination, job_id=JOB, timeout=7.5) # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=7.5, ) self.assertIsInstance(job, CopyJob) @@ -3421,7 +3589,10 @@ def test_copy_table_w_explicit_project(self): # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_copy_table_w_client_location(self): @@ -3468,7 +3639,10 @@ def test_copy_table_w_client_location(self): # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_copy_table_w_source_strings(self): @@ -3556,7 +3730,10 @@ def test_copy_table_w_valid_job_config(self): # Check that copy_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE + method="POST", + path="/projects/%s/jobs" % self.PROJECT, + data=RESOURCE, + timeout=None, ) self.assertIsInstance(job._configuration, CopyJobConfig) @@ -3589,11 +3766,11 @@ def test_extract_table(self): dataset = client.dataset(self.DS_ID) source = dataset.table(SOURCE) - job = client.extract_table(source, DESTINATION, job_id=JOB) + job = client.extract_table(source, DESTINATION, job_id=JOB, timeout=7.5) # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=RESOURCE + method="POST", path="/projects/PROJECT/jobs", data=RESOURCE, timeout=7.5, ) # Check the job resource. @@ -3659,7 +3836,10 @@ def test_extract_table_w_explicit_project(self): # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_extract_table_w_client_location(self): @@ -3700,7 +3880,10 @@ def test_extract_table_w_client_location(self): # Check that extract_table actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_extract_table_generated_job_id(self): @@ -3743,6 +3926,7 @@ def test_extract_table_generated_job_id(self): self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") self.assertIsInstance(req["data"]["jobReference"]["jobId"], six.string_types) + self.assertIsNone(req["timeout"]) # Check the job resource. self.assertIsInstance(job, ExtractJob) @@ -3787,6 +3971,7 @@ def test_extract_table_w_destination_uris(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) # Check the job resource. self.assertIsInstance(job, ExtractJob) @@ -3822,12 +4007,34 @@ def test_query_defaults(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) sent = req["data"] self.assertIsInstance(sent["jobReference"]["jobId"], six.string_types) sent_config = sent["configuration"]["query"] self.assertEqual(sent_config["query"], QUERY) self.assertFalse(sent_config["useLegacySql"]) + def test_query_w_explicit_timeout(self): + query = "select count(*) from persons" + resource = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": {"query": {"query": query, "useLegacySql": False}}, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(resource) + + client.query(query, timeout=7.5) + + # Check that query actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", + path="/projects/{}/jobs".format(self.PROJECT), + data=resource, + timeout=7.5, + ) + def test_query_w_explicit_project(self): job_id = "some-job-id" query = "select count(*) from persons" @@ -3850,7 +4057,10 @@ def test_query_w_explicit_project(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_query_w_explicit_job_config(self): @@ -3906,7 +4116,7 @@ def test_query_w_explicit_job_config(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) # the original config object should not have been modified @@ -3950,7 +4160,7 @@ def test_query_preserving_explicit_job_config(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) # the original config object should not have been modified @@ -4002,7 +4212,7 @@ def test_query_preserving_explicit_default_job_config(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) # the original default config object should not have been modified @@ -4087,7 +4297,7 @@ def test_query_w_explicit_job_config_override(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) def test_query_w_client_default_config_no_incoming(self): @@ -4128,7 +4338,7 @@ def test_query_w_client_default_config_no_incoming(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/PROJECT/jobs", data=resource + method="POST", path="/projects/PROJECT/jobs", data=resource, timeout=None ) def test_query_w_invalid_default_job_config(self): @@ -4170,7 +4380,10 @@ def test_query_w_client_location(self): # Check that query actually starts the job. conn.api_request.assert_called_once_with( - method="POST", path="/projects/other-project/jobs", data=resource + method="POST", + path="/projects/other-project/jobs", + data=resource, + timeout=None, ) def test_query_detect_location(self): @@ -4241,6 +4454,7 @@ def test_query_w_udf_resources(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) sent = req["data"] self.assertIsInstance(sent["jobReference"]["jobId"], six.string_types) sent_config = sent["configuration"]["query"] @@ -4296,6 +4510,7 @@ def test_query_w_query_parameters(self): _, req = conn.api_request.call_args self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/projects/PROJECT/jobs") + self.assertIsNone(req["timeout"]) sent = req["data"] self.assertEqual(sent["jobReference"]["jobId"], JOB) sent_config = sent["configuration"]["query"] @@ -4310,6 +4525,31 @@ def test_query_w_query_parameters(self): }, ) + def test_insert_rows_w_timeout(self): + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection({}) + table = Table(self.TABLE_REF) + + ROWS = [ + ("Phred Phlyntstone", 32), + ("Bharney Rhubble", 33), + ] + schema = [ + SchemaField("full_name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + + client.insert_rows(table, ROWS, selected_fields=schema, timeout=7.5) + + conn.api_request.assert_called_once() + _, req = conn.api_request.call_args + self.assertEqual(req.get("timeout"), 7.5) + def test_insert_rows_wo_schema(self): from google.cloud.bigquery.table import Table @@ -4384,6 +4624,7 @@ def _row_data(row): self.assertEqual(req["method"], "POST") self.assertEqual(req["path"], "/%s" % PATH) self.assertEqual(req["data"], SENT) + self.assertIsNone(req["timeout"]) def test_insert_rows_w_list_of_dictionaries(self): import datetime @@ -4448,7 +4689,7 @@ def _row_data(row): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_list_of_Rows(self): @@ -4493,7 +4734,7 @@ def _row_data(row): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_skip_invalid_and_ignore_unknown(self): @@ -4570,7 +4811,7 @@ def _row_data(row): errors[0]["errors"][0], RESPONSE["insertErrors"][0]["errors"][0] ) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_repeated_fields(self): @@ -4664,7 +4905,7 @@ def test_insert_rows_w_repeated_fields(self): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None, ) def test_insert_rows_w_record_schema(self): @@ -4733,7 +4974,7 @@ def test_insert_rows_w_record_schema(self): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=None ) def test_insert_rows_w_explicit_none_insert_ids(self): @@ -4767,7 +5008,7 @@ def _row_data(row): self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/{}".format(PATH), data=SENT + method="POST", path="/{}".format(PATH), data=SENT, timeout=None, ) def test_insert_rows_errors(self): @@ -4835,6 +5076,7 @@ def test_insert_rows_w_numeric(self): project, ds_id, table_id ), data=sent, + timeout=None, ) @unittest.skipIf(pandas is None, "Requires `pandas`") @@ -4871,7 +5113,7 @@ def test_insert_rows_from_dataframe(self): with mock.patch("uuid.uuid4", side_effect=map(str, range(len(dataframe)))): error_info = client.insert_rows_from_dataframe( - table, dataframe, chunk_size=3 + table, dataframe, chunk_size=3, timeout=7.5 ) self.assertEqual(len(error_info), 2) @@ -4910,7 +5152,9 @@ def test_insert_rows_from_dataframe(self): for call, expected_data in six.moves.zip_longest( actual_calls, EXPECTED_SENT_DATA ): - expected_call = mock.call(method="POST", path=API_PATH, data=expected_data) + expected_call = mock.call( + method="POST", path=API_PATH, data=expected_data, timeout=7.5 + ) assert call == expected_call @unittest.skipIf(pandas is None, "Requires `pandas`") @@ -4955,7 +5199,9 @@ def test_insert_rows_from_dataframe_many_columns(self): } ] } - expected_call = mock.call(method="POST", path=API_PATH, data=EXPECTED_SENT_DATA) + expected_call = mock.call( + method="POST", path=API_PATH, data=EXPECTED_SENT_DATA, timeout=None + ) actual_calls = conn.api_request.call_args_list assert len(actual_calls) == 1 @@ -5007,7 +5253,7 @@ def test_insert_rows_from_dataframe_w_explicit_none_insert_ids(self): actual_calls = conn.api_request.call_args_list assert len(actual_calls) == 1 assert actual_calls[0] == mock.call( - method="POST", path=API_PATH, data=EXPECTED_SENT_DATA + method="POST", path=API_PATH, data=EXPECTED_SENT_DATA, timeout=None ) def test_insert_rows_json(self): @@ -5050,11 +5296,11 @@ def test_insert_rows_json(self): } with mock.patch("uuid.uuid4", side_effect=map(str, range(len(ROWS)))): - errors = client.insert_rows_json(table, ROWS) + errors = client.insert_rows_json(table, ROWS, timeout=7.5) self.assertEqual(len(errors), 0) conn.api_request.assert_called_once_with( - method="POST", path="/%s" % PATH, data=SENT + method="POST", path="/%s" % PATH, data=SENT, timeout=7.5, ) def test_insert_rows_json_with_string_id(self): @@ -5077,6 +5323,7 @@ def test_insert_rows_json_with_string_id(self): method="POST", path="/projects/proj/datasets/dset/tables/tbl/insertAll", data=expected, + timeout=None, ) def test_insert_rows_json_w_explicit_none_insert_ids(self): @@ -5098,6 +5345,7 @@ def test_insert_rows_json_w_explicit_none_insert_ids(self): method="POST", path="/projects/proj/datasets/dset/tables/tbl/insertAll", data=expected, + timeout=None, ) def test_list_partitions(self): @@ -5142,6 +5390,43 @@ def test_list_partitions_with_string_id(self): self.assertEqual(len(partition_list), 0) + def test_list_partitions_splitting_timout_between_requests(self): + from google.cloud.bigquery.table import Table + + row_count = 2 + meta_info = _make_list_partitons_meta_info( + self.PROJECT, self.DS_ID, self.TABLE_ID, row_count + ) + + data = { + "totalRows": str(row_count), + "rows": [{"f": [{"v": "20180101"}]}, {"f": [{"v": "20180102"}]}], + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + client._connection = make_connection(meta_info, data) + table = Table(self.TABLE_REF) + + with freezegun.freeze_time("2019-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_get_table(*args, **kwargs): + frozen_time.tick(delta=1.4) + return orig_get_table(*args, **kwargs) + + orig_get_table = client.get_table + client.get_table = mock.Mock(side_effect=delayed_get_table) + + client.list_partitions(table, timeout=5.0) + + client.get_table.assert_called_once() + _, kwargs = client.get_table.call_args + self.assertEqual(kwargs.get("timeout"), 5.0) + + client._connection.api_request.assert_called() + _, kwargs = client._connection.api_request.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 3.6, places=5) + def test_list_rows(self): import datetime from google.cloud._helpers import UTC @@ -5202,7 +5487,7 @@ def _bigquery_timestamp_float_repr(ts_float): joined = SchemaField("joined", "TIMESTAMP", mode="NULLABLE") table = Table(self.TABLE_REF, schema=[full_name, age, joined]) - iterator = client.list_rows(table) + iterator = client.list_rows(table, timeout=7.5) page = six.next(iterator.pages) rows = list(page) total_rows = iterator.total_rows @@ -5218,7 +5503,7 @@ def _bigquery_timestamp_float_repr(ts_float): self.assertEqual(page_token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=7.5 ) def test_list_rows_empty_table(self): @@ -5326,6 +5611,7 @@ def test_list_rows_repeated_fields(self): method="GET", path="/%s" % PATH, query_params={"selectedFields": "color,struct"}, + timeout=None, ) def test_list_rows_w_record_schema(self): @@ -5392,7 +5678,7 @@ def test_list_rows_w_record_schema(self): self.assertEqual(page_token, TOKEN) conn.api_request.assert_called_once_with( - method="GET", path="/%s" % PATH, query_params={} + method="GET", path="/%s" % PATH, query_params={}, timeout=None ) def test_list_rows_with_missing_schema(self): @@ -5446,19 +5732,61 @@ def test_list_rows_with_missing_schema(self): row_iter = client.list_rows(table) - conn.api_request.assert_called_once_with(method="GET", path=table_path) + conn.api_request.assert_called_once_with( + method="GET", path=table_path, timeout=None + ) conn.api_request.reset_mock() self.assertEqual(row_iter.total_rows, 2, msg=repr(table)) rows = list(row_iter) conn.api_request.assert_called_once_with( - method="GET", path=tabledata_path, query_params={} + method="GET", path=tabledata_path, query_params={}, timeout=None ) self.assertEqual(row_iter.total_rows, 3, msg=repr(table)) self.assertEqual(rows[0].name, "Phred Phlyntstone", msg=repr(table)) self.assertEqual(rows[1].age, 31, msg=repr(table)) self.assertIsNone(rows[2].age, msg=repr(table)) + def test_list_rows_splitting_timout_between_requests(self): + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery.table import Table + + response = {"totalRows": "0", "rows": []} + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + client._connection = make_connection(response, response) + + table = Table( + self.TABLE_REF, schema=[SchemaField("field_x", "INTEGER", mode="NULLABLE")] + ) + + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_get_table(*args, **kwargs): + frozen_time.tick(delta=1.4) + return table + + client.get_table = mock.Mock(side_effect=delayed_get_table) + + rows_iter = client.list_rows( + "{}.{}.{}".format( + self.TABLE_REF.project, + self.TABLE_REF.dataset_id, + self.TABLE_REF.table_id, + ), + timeout=5.0, + ) + six.next(rows_iter.pages) + + client.get_table.assert_called_once() + _, kwargs = client.get_table.call_args + self.assertEqual(kwargs.get("timeout"), 5.0) + + client._connection.api_request.assert_called_once() + _, kwargs = client._connection.api_request.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 3.6) + def test_list_rows_error(self): creds = _make_credentials() http = object() diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index e732bed4dcc6..b796f3f73675 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -18,6 +18,7 @@ import textwrap import unittest +import freezegun import mock import pytest import requests @@ -626,6 +627,7 @@ def test__begin_defaults(self): method="POST", path="/projects/{}/jobs".format(self.PROJECT), data=resource, + timeout=None, ) self.assertEqual(job._properties, resource) @@ -649,13 +651,14 @@ def test__begin_explicit(self): call_api.return_value = resource retry = DEFAULT_RETRY.with_deadline(1) - job._begin(client=client, retry=retry) + job._begin(client=client, retry=retry, timeout=7.5) call_api.assert_called_once_with( retry, method="POST", path="/projects/{}/jobs".format(self.PROJECT), data=resource, + timeout=7.5, ) self.assertEqual(job._properties, resource) @@ -675,6 +678,7 @@ def test_exists_defaults_miss(self): method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), query_params={"fields": "id", "location": self.LOCATION}, + timeout=None, ) def test_exists_explicit_hit(self): @@ -702,6 +706,24 @@ def test_exists_explicit_hit(self): method="GET", path="/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID), query_params={"fields": "id"}, + timeout=None, + ) + + def test_exists_w_timeout(self): + from google.cloud.bigquery.retry import DEFAULT_RETRY + + PATH = "/projects/{}/jobs/{}".format(self.PROJECT, self.JOB_ID) + job = self._set_properties_job() + call_api = job._client._call_api = mock.Mock() + + job.exists(timeout=7.5) + + call_api.assert_called_once_with( + DEFAULT_RETRY, + method="GET", + path=PATH, + query_params={"fields": "id"}, + timeout=7.5, ) def test_reload_defaults(self): @@ -780,6 +802,7 @@ def test_cancel_defaults(self): method="POST", path="/projects/{}/jobs/{}/cancel".format(self.PROJECT, self.JOB_ID), query_params={"location": self.LOCATION}, + timeout=None, ) self.assertEqual(job._properties, resource) @@ -798,12 +821,13 @@ def test_cancel_explicit(self): client = _make_client(project=other_project) connection = client._connection = _make_connection(response) - self.assertTrue(job.cancel(client=client)) + self.assertTrue(job.cancel(client=client, timeout=7.5)) connection.api_request.assert_called_once_with( method="POST", path="/projects/{}/jobs/{}/cancel".format(self.PROJECT, self.JOB_ID), query_params={}, + timeout=7.5, ) self.assertEqual(job._properties, resource) @@ -874,7 +898,7 @@ def test_done_defaults_wo_state(self): self.assertFalse(job.done()) - reload_.assert_called_once_with(retry=DEFAULT_RETRY) + reload_.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None) def test_done_explicit_wo_state(self): from google.cloud.bigquery.retry import DEFAULT_RETRY @@ -884,9 +908,9 @@ def test_done_explicit_wo_state(self): reload_ = job.reload = mock.Mock() retry = DEFAULT_RETRY.with_deadline(1) - self.assertFalse(job.done(retry=retry)) + self.assertFalse(job.done(retry=retry, timeout=7.5)) - reload_.assert_called_once_with(retry=retry) + reload_.assert_called_once_with(retry=retry, timeout=7.5) def test_done_already(self): client = _make_client(project=self.PROJECT) @@ -905,7 +929,7 @@ def test_result_default_wo_state(self, result): self.assertIs(job.result(), result.return_value) - begin.assert_called_once_with(retry=DEFAULT_RETRY) + begin.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None) result.assert_called_once_with(timeout=None) @mock.patch("google.api_core.future.polling.PollingFuture.result") @@ -917,7 +941,7 @@ def test_result_w_retry_wo_state(self, result): self.assertIs(job.result(retry=retry), result.return_value) - begin.assert_called_once_with(retry=retry) + begin.assert_called_once_with(retry=retry, timeout=None) result.assert_called_once_with(timeout=None) @mock.patch("google.api_core.future.polling.PollingFuture.result") @@ -933,6 +957,24 @@ def test_result_explicit_w_state(self, result): begin.assert_not_called() result.assert_called_once_with(timeout=timeout) + @mock.patch("google.api_core.future.polling.PollingFuture.result") + def test_result_splitting_timout_between_requests(self, result): + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, client) + begin = job._begin = mock.Mock() + retry = mock.Mock() + + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_begin(*args, **kwargs): + frozen_time.tick(delta=0.3) + + begin.side_effect = delayed_begin + job.result(retry=retry, timeout=1.0) + + begin.assert_called_once_with(retry=retry, timeout=1.0) + result.assert_called_once_with(timeout=0.7) + def test_cancelled_wo_error_result(self): client = _make_client(project=self.PROJECT) job = self._make_one(self.JOB_ID, client) @@ -2288,6 +2330,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2325,7 +2368,9 @@ def test_begin_w_autodetect(self): } }, } - conn.api_request.assert_called_once_with(method="POST", path=path, data=sent) + conn.api_request.assert_called_once_with( + method="POST", path=path, data=sent, timeout=None + ) self._verifyResourceProperties(job, resource) def test_begin_w_alternate_client(self): @@ -2449,7 +2494,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_hit_w_alternate_client(self): @@ -2464,7 +2509,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_miss_w_job_reference(self): @@ -2481,6 +2526,7 @@ def test_exists_miss_w_job_reference(self): method="GET", path="/projects/other-project/jobs/my-job-id", query_params={"fields": "id", "location": "US"}, + timeout=None, ) def test_reload_w_bound_client(self): @@ -2545,7 +2591,7 @@ def test_cancel_w_bound_client(self): job.cancel() conn.api_request.assert_called_once_with( - method="POST", path=PATH, query_params={} + method="POST", path=PATH, query_params={}, timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2563,7 +2609,7 @@ def test_cancel_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="POST", path=PATH, query_params={} + method="POST", path=PATH, query_params={}, timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2584,6 +2630,7 @@ def test_cancel_w_job_reference(self): method="POST", path="/projects/alternative-project/jobs/{}/cancel".format(self.JOB_ID), query_params={"location": "US"}, + timeout=None, ) @@ -2898,6 +2945,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2946,6 +2994,7 @@ def test_begin_w_alternate_client(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "configuration": {"copy": COPY_CONFIGURATION}, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -2961,7 +3010,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, ) def test_exists_hit_w_alternate_client(self): @@ -2978,7 +3027,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_reload_w_bound_client(self): @@ -3259,6 +3308,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -3308,6 +3358,7 @@ def test_begin_w_alternate_client(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "configuration": {"extract": EXTRACT_CONFIGURATION}, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -3322,7 +3373,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, ) def test_exists_hit_w_alternate_client(self): @@ -3339,7 +3390,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_reload_w_bound_client(self): @@ -4519,7 +4570,8 @@ def test_result_w_timeout(self): client = _make_client(project=self.PROJECT, connection=connection) job = self._make_one(self.JOB_ID, self.QUERY, client) - job.result(timeout=1.0) + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False): + job.result(timeout=1.0) self.assertEqual(len(connection.api_request.call_args_list), 3) begin_request = connection.api_request.call_args_list[0] @@ -4534,6 +4586,49 @@ def test_result_w_timeout(self): self.assertEqual(query_request[1]["query_params"]["timeoutMs"], 900) self.assertEqual(reload_request[1]["method"], "GET") + @mock.patch("google.api_core.future.polling.PollingFuture.result") + def test_result_splitting_timout_between_requests(self, polling_result): + begun_resource = self._make_resource() + query_resource = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "totalRows": "5", + } + done_resource = copy.deepcopy(begun_resource) + done_resource["status"] = {"state": "DONE"} + + connection = _make_connection(begun_resource, query_resource, done_resource) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + client.list_rows = mock.Mock() + + with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: + + def delayed_result(*args, **kwargs): + frozen_time.tick(delta=0.8) + + polling_result.side_effect = delayed_result + + def delayed_get_results(*args, **kwargs): + frozen_time.tick(delta=0.5) + return orig_get_results(*args, **kwargs) + + orig_get_results = client._get_query_results + client._get_query_results = mock.Mock(side_effect=delayed_get_results) + job.result(timeout=2.0) + + polling_result.assert_called_once_with(timeout=2.0) + + client._get_query_results.assert_called_once() + _, kwargs = client._get_query_results.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 1.2) + + client.list_rows.assert_called_once() + _, kwargs = client.list_rows.call_args + self.assertAlmostEqual(kwargs.get("timeout"), 0.7) + def test_result_w_page_size(self): # Arrange query_results_resource = { @@ -4580,12 +4675,16 @@ def test_result_w_page_size(self): conn.api_request.assert_has_calls( [ mock.call( - method="GET", path=tabledata_path, query_params={"maxResults": 3} + method="GET", + path=tabledata_path, + query_params={"maxResults": 3}, + timeout=None, ), mock.call( method="GET", path=tabledata_path, query_params={"pageToken": "some-page-token", "maxResults": 3}, + timeout=None, ), ] ) @@ -4687,6 +4786,28 @@ def test__begin_error(self): expected_line = "{}:{}".format(i, line) assert expected_line in full_text + def test__begin_w_timeout(self): + PATH = "/projects/%s/jobs" % (self.PROJECT,) + RESOURCE = self._make_resource() + + conn = _make_connection(RESOURCE) + client = _make_client(project=self.PROJECT, connection=conn) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + job._begin(timeout=7.5) + + conn.api_request.assert_called_once_with( + method="POST", + path=PATH, + data={ + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "configuration": { + "query": {"query": self.QUERY, "useLegacySql": False} + }, + }, + timeout=7.5, + ) + def test_begin_w_bound_client(self): from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery.job import QueryJobConfig @@ -4726,6 +4847,7 @@ def test_begin_w_bound_client(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4795,6 +4917,7 @@ def test_begin_w_alternate_client(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "configuration": {"dryRun": True, "query": QUERY_CONFIGURATION}, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4845,6 +4968,7 @@ def test_begin_w_udf(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4892,6 +5016,7 @@ def test_begin_w_named_query_parameter(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -4935,6 +5060,7 @@ def test_begin_w_positional_query_parameter(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -5011,6 +5137,7 @@ def test_begin_w_table_defs(self): } }, }, + timeout=None, ) self._verifyResourceProperties(job, want_resource) @@ -5043,6 +5170,7 @@ def test_dry_run_query(self): "dryRun": True, }, }, + timeout=None, ) self._verifyResourceProperties(job, RESOURCE) @@ -5055,7 +5183,7 @@ def test_exists_miss_w_bound_client(self): self.assertFalse(job.exists()) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_hit_w_alternate_client(self): @@ -5070,7 +5198,7 @@ def test_exists_hit_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"} + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_reload_w_bound_client(self):