Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Client.query_and_wait which directly returns a RowIterator of results #1722

Merged
merged 44 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
b8c583a
perf: use the first page a results when `query(api_method="QUERY")`
tswast Nov 15, 2023
6a8059d
add tests
tswast Nov 15, 2023
1f0e38e
respect max_results with cached page
tswast Nov 16, 2023
4401725
respect page_size, also avoid bqstorage if almost fully downloaded
tswast Nov 16, 2023
d078941
skip true test if bqstorage not installed
tswast Nov 16, 2023
660aa76
Merge remote-tracking branch 'origin/main' into issue589-RowIterator.…
tswast Nov 16, 2023
476bcd7
coverage
tswast Nov 16, 2023
05d6a3e
Merge remote-tracking branch 'origin/main' into issue589-RowIterator.…
tswast Nov 20, 2023
c16e4be
feat: add `Client.query_and_wait` which directly returns a `RowIterat…
tswast Nov 15, 2023
222f91b
implement basic query_and_wait and add code sample to test
tswast Nov 20, 2023
73e5817
avoid duplicated QueryJob construction
tswast Nov 20, 2023
9508121
update unit tests
tswast Nov 21, 2023
543481d
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 21, 2023
85f1cab
fix merge conflict in rowiterator
tswast Nov 21, 2023
c0e6c86
support max_results, add tests
tswast Nov 21, 2023
d4a322d
retry tests
tswast Nov 21, 2023
e0b2d2e
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 21, 2023
9daccbd
unit test coverage
tswast Nov 22, 2023
bba36d2
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 22, 2023
adf0b49
dont retry twice
tswast Nov 22, 2023
6dfbf92
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 22, 2023
765a644
fix mypy_samples session
tswast Nov 27, 2023
e461ebe
consolidate docstrings for query_and_wait
tswast Nov 27, 2023
895b6d0
remove mention of job ID
tswast Nov 28, 2023
d5345cd
fallback to jobs.insert for unsupported features
tswast Nov 29, 2023
f75d8ab
distinguish API timeout from wait timeout
tswast Nov 29, 2023
baff9d6
add test for jobs.insert fallback
tswast Nov 29, 2023
221898d
populate default job config
tswast Nov 29, 2023
18f825a
refactor default config
tswast Nov 29, 2023
5afbc41
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Nov 29, 2023
08167d8
add coverage for job_config
tswast Nov 29, 2023
3e10ea4
cancel job if hasn't finished
tswast Nov 29, 2023
dc5e5be
mypy
tswast Nov 30, 2023
f1556bc
allow unrealeased features in samples
tswast Dec 1, 2023
db71a1b
Merge branch 'main' into issue589-query_and_wait
tswast Dec 3, 2023
bd7e767
fix for 3.12
tswast Dec 4, 2023
4ffec17
Merge branch 'main' into issue589-query_and_wait
tswast Dec 6, 2023
95b3b0e
Merge remote-tracking branch 'origin/main' into issue589-query_and_wait
tswast Dec 7, 2023
f08dac3
fix: keep `RowIterator.total_rows` populated after iteration
tswast Dec 8, 2023
a376bd6
Update google/cloud/bigquery/table.py
tswast Dec 8, 2023
7c3d813
Merge remote-tracking branch 'origin/issue589-query_and_wait' into is…
tswast Dec 8, 2023
304799a
Merge remote-tracking branch 'origin/issue589-total_rows' into issue5…
tswast Dec 8, 2023
425f6b0
fix comments
tswast Dec 8, 2023
ae06cd2
Merge branch 'main' into issue589-query_and_wait
tswast Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 303 additions & 13 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for interacting with the job REST APIs from the client."""
"""Helpers for interacting with the job REST APIs from the client.

For queries, there are three cases to consider:

1. jobs.insert: This always returns a job resource.
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
This sometimes can return the results inline, but always includes a job ID.
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
This sometimes doesn't create a job at all, instead returning the results.
For better debugging, an auto-generated query ID is included in the
response.

Client.query() calls either (1) or (2), depending on what the user provides
for the api_method parameter. query() always returns a QueryJob object, which
can retry the query when the query job fails for a retriable reason.

Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
local results from the response or may wrap a query job containing multiple
pages of results. Even though query_and_wait() waits for the job to complete,
we still need a separate job_retry object because there are different
predicates where it is safe to generate a new query ID.
"""

import copy
import functools
import os
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional

Expand All @@ -23,6 +46,7 @@

from google.cloud.bigquery import job
import google.cloud.bigquery.query
from google.cloud.bigquery import table

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
Expand Down Expand Up @@ -59,6 +83,25 @@ def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> s
return str(uuid.uuid4())


def job_config_with_defaults(
job_config: Optional[job.QueryJobConfig],
default_job_config: Optional[job.QueryJobConfig],
) -> Optional[job.QueryJobConfig]:
"""Create a copy of `job_config`, replacing unset values with those from
`default_job_config`.
"""
if job_config is None:
return default_job_config

if default_job_config is None:
return job_config

# Both job_config and default_job_config are not None, so make a copy of
# job_config merged with default_job_config. Anything already explicitly
# set on job_config should not be replaced.
return job_config._fill_from_default(default_job_config)


def query_jobs_insert(
client: "Client",
query: str,
Expand All @@ -67,9 +110,9 @@ def query_jobs_insert(
job_id_prefix: Optional[str],
location: Optional[str],
project: str,
retry: retries.Retry,
retry: Optional[retries.Retry],
timeout: Optional[float],
job_retry: retries.Retry,
job_retry: Optional[retries.Retry],
) -> job.QueryJob:
"""Initiate a query using jobs.insert.

Expand Down Expand Up @@ -123,7 +166,13 @@ def do_query():
return future


def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]:
def _to_query_request(
tswast marked this conversation as resolved.
Show resolved Hide resolved
job_config: Optional[job.QueryJobConfig] = None,
*,
query: str,
location: Optional[str] = None,
timeout: Optional[float] = None,
) -> Dict[str, Any]:
"""Transform from Job resource to QueryRequest resource.

Most of the keys in job.configuration.query are in common with
Expand All @@ -150,6 +199,15 @@ def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any
request_body.setdefault("formatOptions", {})
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
tswast marked this conversation as resolved.
Show resolved Hide resolved

if location is not None:
request_body["location"] = location

request_body["query"] = query

return request_body


Expand Down Expand Up @@ -207,6 +265,10 @@ def _to_query_job(
return query_job


def _to_query_path(project: str) -> str:
return f"/projects/{project}/queries"


def query_jobs_query(
client: "Client",
query: str,
Expand All @@ -217,18 +279,14 @@ def query_jobs_query(
timeout: Optional[float],
job_retry: retries.Retry,
) -> job.QueryJob:
"""Initiate a query using jobs.query.
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.

See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
"""
path = f"/projects/{project}/queries"
request_body = _to_query_request(job_config)

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
request_body["location"] = location
request_body["query"] = query
path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=timeout
)

def do_query():
request_body["requestId"] = make_job_id()
Expand All @@ -253,3 +311,235 @@ def do_query():
future._job_retry = job_retry

return future


def query_and_wait(
client: "Client",
query: str,
*,
job_config: Optional[job.QueryJobConfig],
location: Optional[str],
project: str,
api_timeout: Optional[float] = None,
wait_timeout: Optional[float] = None,
retry: Optional[retries.Retry],
job_retry: Optional[retries.Retry],
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> table.RowIterator:
"""Run the query, wait for it to finish, and return the results.

While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview in the
``jobs.query`` REST API, use the default ``jobCreationMode`` unless
the environment variable ``QUERY_PREVIEW_ENABLED=true``. After
``jobCreationMode`` is GA, this method will always use
``jobCreationMode=JOB_CREATION_OPTIONAL``. See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query

Args:
client:
BigQuery client to make API calls.
query (str):
SQL query to be executed. Defaults to the standard SQL
dialect. Use the ``job_config`` parameter to change dialects.
job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
Extra configuration options for the job.
To override any options that were previously set in
the ``default_query_job_config`` given to the
``Client`` constructor, manually set those options to ``None``,
or whatever value is preferred.
location (Optional[str]):
Location where to run the job. Must match the location of the
table used in the query as well as the destination table.
project (Optional[str]):
Project ID of the project of where to run the job. Defaults
to the client's project.
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
wait_timeout (Optional[float]):
The number of seconds to wait for the query to finish. If the
query doesn't finish before this timeout, the client attempts
to cancel the query.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
a reasonable default that should only be overridden
with care.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry. Not all jobs can be retried.
page_size (Optional[int]):
The maximum number of rows in each page of results from this
request. Non-positive values are ignored.
max_results (Optional[int]):
The maximum total number of rows from this request.

Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
set, which counts the total number of rows **in the result
set** (this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).

If the query is a special query that produces no results, e.g.
a DDL query, an ``_EmptyRowIterator`` instance is returned.

Raises:
TypeError:
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
# Some API parameters aren't supported by the jobs.query API. In these
# cases, fallback to a jobs.insert call.
if not _supported_by_jobs_query(job_config):
return _wait_or_cancel(
query_jobs_insert(
client=client,
query=query,
job_id=None,
job_id_prefix=None,
job_config=job_config,
location=location,
project=project,
retry=retry,
timeout=api_timeout,
job_retry=job_retry,
),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)

path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)

tswast marked this conversation as resolved.
Show resolved Hide resolved
if page_size is not None and max_results is not None:
request_body["maxResults"] = min(page_size, max_results)
elif page_size is not None or max_results is not None:
request_body["maxResults"] = page_size or max_results

if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See: Java client changes. Needs to check a set of projects instead of all projects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tswast , in our previous discussion, I only suggested for this environment variable to use a list of project IDs for a more fine-grained control. We have discussed it with @shollyman , and for the Java client we decided to go with a boolean. I am fine either way, staying consistent with a boolean or doing a more fine-grained project id list (if required by python client usage).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll stick with the boolean for consistency with other client libraries, but definitely worth considering finer grained control in a future release if needed.

request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"

def do_query():
request_body["requestId"] = make_job_id()
span_attributes = {"path": path}

# For easier testing, handle the retries ourselves.
if retry is not None:
response = retry(client._call_api)(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to be careful with timeout here. requestId should help, but need to make sure the retry includes the requestId.

retry=None, # We're calling the retry decorator ourselves.
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)
else:
response = client._call_api(
retry=None,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)

# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
# to fetch, there will be a job ID for jobs.getQueryResults.
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
response
)
page_token = query_results.page_token
more_pages = page_token is not None

if more_pages or not query_results.complete:
# TODO(swast): Avoid a call to jobs.get in some cases (few
# remaining pages) by waiting for the query to finish and calling
# client._list_rows_from_query_results directly. Need to update
# RowIterator to fetch destination table via the job ID if needed.
return _wait_or_cancel(
_to_query_job(client, query, job_config, response),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)

return table.RowIterator(
client=client,
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
path=None,
schema=query_results.schema,
max_results=max_results,
page_size=page_size,
total_rows=query_results.total_rows,
first_page_response=response,
location=query_results.location,
job_id=query_results.job_id,
query_id=query_results.query_id,
project=query_results.project,
)

if job_retry is not None:
return job_retry(do_query)()
else:
return do_query()


def _supported_by_jobs_query(job_config: Optional[job.QueryJobConfig]) -> bool:
"""True if jobs.query can be used. False if jobs.insert is needed."""
if job_config is None:
return True

return (
# These features aren't supported by jobs.query.
job_config.clustering_fields is None
and job_config.destination is None
and job_config.destination_encryption_configuration is None
and job_config.range_partitioning is None
and job_config.table_definitions is None
and job_config.time_partitioning is None
)


def _wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
wait_timeout: Optional[float],
retry: Optional[retries.Retry],
page_size: Optional[int],
max_results: Optional[int],
) -> table.RowIterator:
"""Wait for a job to complete and return the results.

If we can't return the results within the ``wait_timeout``, try to cancel
the job.
"""
try:
return job.result(
page_size=page_size,
max_results=max_results,
retry=retry,
timeout=wait_timeout,
)
except Exception:
# Attempt to cancel the job since we can't return the results.
try:
job.cancel(retry=retry, timeout=api_timeout)
except Exception:
# Don't eat the original exception if cancel fails.
pass
raise
Loading