Skip to content

Commit

Permalink
feat: add job_id, location, project, and query_id properties …
Browse files Browse the repository at this point in the history
…on `RowIterator` (googleapis#1733)

* feat: add `job_id`, `location`, `project`, and `query_id` properties on `RowIterator`

These can be used to recover the original job metadata when `RowIterator` is
the result of a `QueryJob`.

* rename bqstorage_project to billing project

* Update google/cloud/bigquery/table.py

Co-authored-by: Lingqing Gan <lingqing.gan@gmail.com>

---------

Co-authored-by: Lingqing Gan <lingqing.gan@gmail.com>
  • Loading branch information
2 people authored and kiraksi committed Nov 20, 2023
1 parent 705741e commit 570e17a
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 11 deletions.
10 changes: 10 additions & 0 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3843,6 +3843,8 @@ def list_rows(
# tables can be fetched without a column filter.
selected_fields=selected_fields,
total_rows=getattr(table, "num_rows", None),
project=table.project,
location=table.location,
)
return row_iterator

Expand All @@ -3859,6 +3861,7 @@ def _list_rows_from_query_results(
page_size: Optional[int] = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
query_id: Optional[str] = None,
) -> RowIterator:
"""List the rows of a completed query.
See
Expand Down Expand Up @@ -3898,6 +3901,9 @@ def _list_rows_from_query_results(
would otherwise be a successful response.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
query_id (Optional[str]):
[Preview] ID of a completed query. This ID is auto-generated
and not guaranteed to be populated.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
Expand Down Expand Up @@ -3928,6 +3934,10 @@ def _list_rows_from_query_results(
table=destination,
extra_params=params,
total_rows=total_rows,
project=project,
location=location,
job_id=job_id,
query_id=query_id,
)
return row_iterator

Expand Down
24 changes: 22 additions & 2 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,15 @@ def query(self):
self._properties, ["configuration", "query", "query"]
)

@property
def query_id(self) -> Optional[str]:
"""[Preview] ID of a completed query.
This ID is auto-generated and not guaranteed to be populated.
"""
query_results = self._query_results
return query_results.query_id if query_results is not None else None

@property
def query_parameters(self):
"""See
Expand Down Expand Up @@ -1525,7 +1534,12 @@ def result( # type: ignore # (complaints about the overloaded signature)
provided and the job is not retryable.
"""
if self.dry_run:
return _EmptyRowIterator()
return _EmptyRowIterator(
project=self.project,
location=self.location,
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)
try:
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
Expand Down Expand Up @@ -1594,7 +1608,12 @@ def do_get_result():
# indicate success and avoid calling tabledata.list on a table which
# can't be read (such as a view table).
if self._query_results.total_rows is None:
return _EmptyRowIterator()
return _EmptyRowIterator(
location=self.location,
project=self.project,
job_id=self.job_id,
query_id=self.query_id,
)

rows = self._client._list_rows_from_query_results(
self.job_id,
Expand All @@ -1608,6 +1627,7 @@ def do_get_result():
start_index=start_index,
retry=retry,
timeout=timeout,
query_id=self.query_id,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,14 @@ def job_id(self):
"""
return self._properties.get("jobReference", {}).get("jobId")

@property
def query_id(self) -> Optional[str]:
"""[Preview] ID of a completed query.
This ID is auto-generated and not guaranteed to be populated.
"""
return self._properties.get("queryId")

@property
def page_token(self):
"""Token for fetching next bach of results.
Expand Down
49 changes: 46 additions & 3 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,10 @@ def __init__(
selected_fields=None,
total_rows=None,
first_page_response=None,
location: Optional[str] = None,
job_id: Optional[str] = None,
query_id: Optional[str] = None,
project: Optional[str] = None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1575,12 +1579,51 @@ def __init__(
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
self._project = client.project if client is not None else None
self._schema = schema
self._selected_fields = selected_fields
self._table = table
self._total_rows = total_rows
self._first_page_response = first_page_response
self._location = location
self._job_id = job_id
self._query_id = query_id
self._project = project

@property
def _billing_project(self) -> Optional[str]:
"""GCP Project ID where BQ API will bill to (if applicable)."""
client = self.client
return client.project if client is not None else None

@property
def job_id(self) -> Optional[str]:
"""ID of the query job (if applicable).
To get the job metadata, call
``job = client.get_job(rows.job_id, location=rows.location)``.
"""
return self._job_id

@property
def location(self) -> Optional[str]:
"""Location where the query executed (if applicable).
See: https://cloud.google.com/bigquery/docs/locations
"""
return self._location

@property
def project(self) -> Optional[str]:
"""GCP Project ID where these rows are read from."""
return self._project

@property
def query_id(self) -> Optional[str]:
"""[Preview] ID of a completed query.
This ID is auto-generated and not guaranteed to be populated.
"""
return self._query_id

def _is_completely_cached(self):
"""Check if all results are completely cached.
Expand Down Expand Up @@ -1723,7 +1766,7 @@ def to_arrow_iterable(

bqstorage_download = functools.partial(
_pandas_helpers.download_arrow_bqstorage,
self._project,
self._billing_project,
self._table,
bqstorage_client,
preserve_order=self._preserve_order,
Expand Down Expand Up @@ -1903,7 +1946,7 @@ def to_dataframe_iterable(
column_names = [field.name for field in self._schema]
bqstorage_download = functools.partial(
_pandas_helpers.download_dataframe_bqstorage,
self._project,
self._billing_project,
self._table,
bqstorage_client,
column_names,
Expand Down
18 changes: 17 additions & 1 deletion tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ def test_result(self):
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
"queryId": "abc-def",
}
job_resource = self._make_resource(started=True, location="EU")
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
Expand Down Expand Up @@ -980,6 +981,10 @@ def test_result(self):
rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(result.job_id, self.JOB_ID)
self.assertEqual(result.location, "EU")
self.assertEqual(result.project, self.PROJECT)
self.assertEqual(result.query_id, "abc-def")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 1)
Expand Down Expand Up @@ -1023,6 +1028,12 @@ def test_result_dry_run(self):
calls = conn.api_request.mock_calls
self.assertIsInstance(result, _EmptyRowIterator)
self.assertEqual(calls, [])
self.assertEqual(result.location, "EU")
self.assertEqual(result.project, self.PROJECT)
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
self.assertIsNone(result.job_id)
self.assertIsNone(result.query_id)

def test_result_with_done_job_calls_get_query_results(self):
query_resource_done = {
Expand Down Expand Up @@ -1180,16 +1191,21 @@ def test_result_w_empty_schema(self):
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": []},
"queryId": "xyz-abc",
}
connection = make_connection(query_resource, query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
resource = self._make_resource(ended=True, location="asia-northeast1")
job = self._get_target_class().from_api_repr(resource, client)

result = job.result()

self.assertIsInstance(result, _EmptyRowIterator)
self.assertEqual(list(result), [])
self.assertEqual(result.project, self.PROJECT)
self.assertEqual(result.job_id, self.JOB_ID)
self.assertEqual(result.location, "asia-northeast1")
self.assertEqual(result.query_id, "xyz-abc")

def test_result_invokes_begins(self):
begun_resource = self._make_resource()
Expand Down
12 changes: 8 additions & 4 deletions tests/unit/job/test_query_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
[name_array, age_array], schema=arrow_schema
)
connection = make_connection(query_resource)
client = _make_client(connection=connection)
client = _make_client(connection=connection, project="bqstorage-billing-project")
job = target_class.from_api_repr(resource, client)
session = bigquery_storage.types.ReadSession()
session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes()
Expand Down Expand Up @@ -597,7 +597,9 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
**table_read_options_kwarg,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
# The billing project can differ from the data project. Make sure we
# are charging to the billing project, not the data project.
parent="projects/bqstorage-billing-project",
read_session=expected_session,
max_stream_count=0, # Use default number of streams for best performance.
)
Expand All @@ -618,7 +620,7 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
"schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]},
}
connection = make_connection(query_resource)
client = _make_client(connection=connection)
client = _make_client(connection=connection, project="bqstorage-billing-project")
job = target_class.from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
Expand Down Expand Up @@ -646,7 +648,9 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
data_format=bigquery_storage.DataFormat.ARROW,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
# The billing project can differ from the data project. Make sure we
# are charging to the billing project, not the data project.
parent="projects/bqstorage-billing-project",
read_session=expected_session,
max_stream_count=0,
)
Expand Down
11 changes: 10 additions & 1 deletion tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6404,11 +6404,16 @@ def test_list_rows(self):
age = SchemaField("age", "INTEGER", mode="NULLABLE")
joined = SchemaField("joined", "TIMESTAMP", mode="NULLABLE")
table = Table(self.TABLE_REF, schema=[full_name, age, joined])
table._properties["location"] = "us-central1"
table._properties["numRows"] = 7

iterator = client.list_rows(table, timeout=7.5)

# Check that initial total_rows is populated from the table.
# Check that initial RowIterator is populated from the table metadata.
self.assertIsNone(iterator.job_id)
self.assertEqual(iterator.location, "us-central1")
self.assertEqual(iterator.project, table.project)
self.assertIsNone(iterator.query_id)
self.assertEqual(iterator.total_rows, 7)
page = next(iterator.pages)
rows = list(page)
Expand Down Expand Up @@ -6524,6 +6529,10 @@ def test_list_rows_empty_table(self):
selected_fields=[],
)

self.assertIsNone(rows.job_id)
self.assertIsNone(rows.location)
self.assertEqual(rows.project, self.TABLE_REF.project)
self.assertIsNone(rows.query_id)
# When a table reference / string and selected_fields is provided,
# total_rows can't be populated until iteration starts.
self.assertIsNone(rows.total_rows)
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,16 @@ def test_page_token_present(self):
query = self._make_one(resource)
self.assertEqual(query.page_token, "TOKEN")

def test_query_id_missing(self):
query = self._make_one(self._make_resource())
self.assertIsNone(query.query_id)

def test_query_id_present(self):
resource = self._make_resource()
resource["queryId"] = "test-query-id"
query = self._make_one(resource)
self.assertEqual(query.query_id, "test-query-id")

def test_total_rows_present_integer(self):
resource = self._make_resource()
resource["totalRows"] = 42
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,38 @@ def test_constructor_with_dict_schema(self):
]
self.assertEqual(iterator.schema, expected_schema)

def test_job_id_missing(self):
rows = self._make_one()
self.assertIsNone(rows.job_id)

def test_job_id_present(self):
rows = self._make_one(job_id="abc-123")
self.assertEqual(rows.job_id, "abc-123")

def test_location_missing(self):
rows = self._make_one()
self.assertIsNone(rows.location)

def test_location_present(self):
rows = self._make_one(location="asia-northeast1")
self.assertEqual(rows.location, "asia-northeast1")

def test_project_missing(self):
rows = self._make_one()
self.assertIsNone(rows.project)

def test_project_present(self):
rows = self._make_one(project="test-project")
self.assertEqual(rows.project, "test-project")

def test_query_id_missing(self):
rows = self._make_one()
self.assertIsNone(rows.query_id)

def test_query_id_present(self):
rows = self._make_one(query_id="xyz-987")
self.assertEqual(rows.query_id, "xyz-987")

def test_iterate(self):
from google.cloud.bigquery.schema import SchemaField

Expand Down

0 comments on commit 570e17a

Please sign in to comment.