Skip to content

Commit

Permalink
feat(bigquery): add support for listing jobs by parent job (#9225)
Browse files Browse the repository at this point in the history
* Add parent_job_id, num_child_jobs to *Job classes

* Add parent_job parameter to client.list_jobs()

* Add system test for listing scripting jobs
  • Loading branch information
plamut committed Sep 27, 2019
1 parent 2bb22ae commit c91eb11
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 0 deletions.
10 changes: 10 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ def cancel_job(self, job_id, project=None, location=None, retry=DEFAULT_RETRY):
def list_jobs(
self,
project=None,
parent_job=None,
max_results=None,
page_token=None,
all_users=None,
Expand All @@ -1233,6 +1234,11 @@ def list_jobs(
project (str, optional):
Project ID to use for retreiving datasets. Defaults
to the client's project.
parent_job (Optional[Union[ \
:class:`~google.cloud.bigquery.job._AsyncJob`, \
str, \
]]):
If set, retrieve only child jobs of the specified parent.
max_results (int, optional):
Maximum number of jobs to return.
page_token (str, optional):
Expand Down Expand Up @@ -1265,6 +1271,9 @@ def list_jobs(
google.api_core.page_iterator.Iterator:
Iterable of job instances.
"""
if isinstance(parent_job, job._AsyncJob):
parent_job = parent_job.job_id

extra_params = {
"allUsers": all_users,
"stateFilter": state_filter,
Expand All @@ -1275,6 +1284,7 @@ def list_jobs(
google.cloud._helpers._millis_from_datetime(max_creation_time)
),
"projection": "full",
"parentJobId": parent_job,
}

extra_params = {
Expand Down
25 changes: 25 additions & 0 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,31 @@ def job_id(self):
"""str: ID of the job."""
return _helpers._get_sub_prop(self._properties, ["jobReference", "jobId"])

@property
def parent_job_id(self):
"""Return the ID of the parent job.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.parent_job_id
Returns:
Optional[str]
"""
return _helpers._get_sub_prop(self._properties, ["statistics", "parentJobId"])

@property
def num_child_jobs(self):
"""The number of child jobs executed.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics.FIELDS.num_child_jobs
Returns:
int
"""
count = _helpers._get_sub_prop(self._properties, ["statistics", "numChildJobs"])
return int(count) if count is not None else 0

@property
def project(self):
"""Project bound to the job.
Expand Down
48 changes: 48 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,54 @@ def test_list_tables(self):
)
self.assertGreater(len(list(iterator)), 0)

def test_listing_scripting_jobs(self):
# run an SQL script
sql_script = """
-- Declare a variable to hold names as an array.
DECLARE top_names ARRAY<STRING>;
-- Build an array of the top 100 names from the year 2017.
SET top_names = (
SELECT ARRAY_AGG(name ORDER BY number DESC LIMIT 100)
FROM `bigquery-public-data.usa_names.usa_1910_current`
WHERE year = 2017
);
-- Which names appear as words in Shakespeare's plays?
SELECT
name AS shakespeare_name
FROM UNNEST(top_names) AS name
WHERE name IN (
SELECT word
FROM `bigquery-public-data.samples.shakespeare`
);
"""
test_start = datetime.datetime.utcnow()
query_job = Config.CLIENT.query(sql_script, project=Config.CLIENT.project)
query_job.result()

# fetch jobs created by the SQL script, sort them into parent and
# child jobs
script_jobs = list(Config.CLIENT.list_jobs(min_creation_time=test_start))

parent_jobs = []
child_jobs = []

for job in script_jobs:
if job.num_child_jobs > 0:
parent_jobs.append(job)
else:
child_jobs.append(job)

assert len(parent_jobs) == 1 # also implying num_child_jobs > 0
assert len(child_jobs) == parent_jobs[0].num_child_jobs

# fetch jobs using the parent job filter, verify that results are as expected
fetched_jobs = list(Config.CLIENT.list_jobs(parent_job=parent_jobs[0]))
assert sorted(job.job_id for job in fetched_jobs) == sorted(
job.job_id for job in child_jobs
)

def test_update_table(self):
dataset = self.temp_dataset(_make_dataset_id("update_table"))

Expand Down
18 changes: 18 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2952,6 +2952,24 @@ def test_list_jobs_w_time_filter(self):
},
)

def test_list_jobs_w_parent_job_filter(self):
from google.cloud.bigquery import job

creds = _make_credentials()
client = self._make_one(self.PROJECT, creds)
conn = client._connection = make_connection({}, {})

parent_job_args = ["parent-job-123", job._AsyncJob("parent-job-123", client)]

for parent_job in parent_job_args:
list(client.list_jobs(parent_job=parent_job))
conn.api_request.assert_called_once_with(
method="GET",
path="/projects/%s/jobs" % self.PROJECT,
query_params={"projection": "full", "parentJobId": "parent-job-123"},
)
conn.api_request.reset_mock()

def test_load_table_from_uri(self):
from google.cloud.bigquery.job import LoadJob

Expand Down
16 changes: 16 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,22 @@ def test_job_type(self):

self.assertEqual(derived.job_type, "derived")

def test_parent_job_id(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)

self.assertIsNone(job.parent_job_id)
job._properties["statistics"] = {"parentJobId": "parent-job-123"}
self.assertEqual(job.parent_job_id, "parent-job-123")

def test_num_child_jobs(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)

self.assertEqual(job.num_child_jobs, 0)
job._properties["statistics"] = {"numChildJobs": "17"}
self.assertEqual(job.num_child_jobs, 17)

def test_labels_miss(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
Expand Down

0 comments on commit c91eb11

Please sign in to comment.