Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to give default QueryJobConfig to Client #6088

Merged
merged 11 commits into from
Sep 26, 2018
28 changes: 25 additions & 3 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ class Client(ClientWithProject):
current object.
This parameter should be considered private, and could change in
the future.
location str:
location (str):
(Optional) Default location for jobs / datasets / tables.
default_query_job_config (google.cloud.bigquery.job.QueryJobConfig):
(Optional) Default ``QueryJobConfig``.
Will be merged into job configs passed into the ``query`` method.

Raises:
google.auth.exceptions.DefaultCredentialsError:
Expand All @@ -122,11 +125,13 @@ class Client(ClientWithProject):
"""The scopes required for authenticating as a BigQuery consumer."""

def __init__(
self, project=None, credentials=None, _http=None, location=None):
self, project=None, credentials=None, _http=None,
location=None, default_query_job_config=None):
super(Client, self).__init__(
project=project, credentials=credentials, _http=_http)
self._connection = Connection(self)
self._location = location
self._default_query_job_config = default_query_job_config

@property
def location(self):
Expand Down Expand Up @@ -1187,7 +1192,9 @@ def extract_table(
return extract_job

def query(
self, query, job_config=None, job_id=None, job_id_prefix=None,
self, query,
job_config=None,
job_id=None, job_id_prefix=None,
location=None, project=None, retry=DEFAULT_RETRY):
"""Run a SQL query.

Expand All @@ -1202,6 +1209,10 @@ def query(
Keyword Arguments:
job_config (google.cloud.bigquery.job.QueryJobConfig):
(Optional) Extra configuration options for the job.
To override any options that were previously set in
the ``default_query_job_config`` given to the
``Client`` constructor, manually set those options to ``None``,
or whatever value is preferred.
job_id (str): (Optional) ID to use for the query job.
job_id_prefix (str):
(Optional) The prefix to use for a randomly generated job ID.
Expand All @@ -1226,6 +1237,17 @@ def query(
if location is None:
location = self.location

if self._default_query_job_config:
if job_config:
# anything that's not defined on the incoming
# that is in the default,
# should be filled in with the default
# the incoming therefore has precedence
job_config = job_config._fill_from_default(
self._default_query_job_config)
else:
job_config = self._default_query_job_config

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(
job_ref, query, client=self, job_config=job_config)
Expand Down
33 changes: 33 additions & 0 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,39 @@ def to_api_repr(self):
"""
return copy.deepcopy(self._properties)

def _fill_from_default(self, default_job_config):
"""Merge this job config with a default job config.

The keys in this object take precedence over the keys in the default
config. The merge is done at the top-level as well as for keys one
level below the job type.

Arguments:
default_job_config (google.cloud.bigquery.job._JobConfig):
The default job config that will be used to fill in self.

Returns:
google.cloud.bigquery.job._JobConfig A new (merged) job config.
"""
if self._job_type != default_job_config._job_type:
raise TypeError(
"attempted to merge two incompatible job types: "
+ repr(self._job_type) + ', '
+ repr(default_job_config._job_type))

new_job_config = self.__class__()

default_job_properties = copy.deepcopy(default_job_config._properties)
for key in self._properties:
if key != self._job_type:
default_job_properties[key] = self._properties[key]

default_job_properties[self._job_type] \
.update(self._properties[self._job_type])
new_job_config._properties = default_job_properties

return new_job_config

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a job configuration given its API representation
Expand Down
164 changes: 164 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ def test_ctor_w_location(self):
self.assertIs(client._connection.http, http)
self.assertEqual(client.location, location)

def test_ctor_w_query_job_config(self):
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import QueryJobConfig

creds = _make_credentials()
http = object()
location = 'us-central'
job_config = QueryJobConfig()
job_config.dry_run = True

client = self._make_one(project=self.PROJECT, credentials=creds,
_http=http, location=location,
default_query_job_config=job_config)
self.assertIsInstance(client._connection, Connection)
self.assertIs(client._connection.credentials, creds)
self.assertIs(client._connection.http, http)
self.assertEqual(client.location, location)

self.assertIsInstance(client._default_query_job_config, QueryJobConfig)
self.assertTrue(client._default_query_job_config.dry_run)

def test__get_query_results_miss_w_explicit_project_and_timeout(self):
from google.cloud.exceptions import NotFound

Expand Down Expand Up @@ -2707,6 +2728,149 @@ def test_query_w_explicit_project(self):
data=resource,
)

def test_query_w_explicit_job_config(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'defaultDataset': {
'projectId': self.PROJECT,
'datasetId': 'some-dataset',
},
'useLegacySql': False,
'useQueryCache': True,
'maximumBytesBilled': '2000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference
default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, 'some-dataset')
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds,
_http=http, default_query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000

client.query(
query, job_id=job_id, location=self.LOCATION,
job_config=job_config)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_explicit_job_config_override(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'defaultDataset': None,
'useLegacySql': False,
'useQueryCache': True,
'maximumBytesBilled': '2000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference
default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, 'some-dataset')
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds, _http=http,
default_query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000
job_config.default_dataset = None

client.query(
query, job_id=job_id, location=self.LOCATION,
job_config=job_config,
)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_client_default_config_no_incoming(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'useLegacySql': False,
'maximumBytesBilled': '1000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig
default_job_config = QueryJobConfig()
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds, _http=http,
default_query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

client.query(
query, job_id=job_id, location=self.LOCATION)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_client_location(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
Expand Down
28 changes: 28 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,34 @@ def test_ctor(self):
self.assertEqual(job_config._job_type, self.JOB_TYPE)
self.assertEqual(job_config._properties, {self.JOB_TYPE: {}})

def test_fill_from_default(self):
from google.cloud.bigquery import QueryJobConfig

job_config = QueryJobConfig()
job_config.dry_run = True
job_config.maximum_bytes_billed = 1000

default_job_config = QueryJobConfig()
default_job_config.use_query_cache = True
default_job_config.maximum_bytes_billed = 2000

final_job_config = job_config._fill_from_default(default_job_config)
self.assertTrue(final_job_config.dry_run)
self.assertTrue(final_job_config.use_query_cache)
self.assertEqual(final_job_config.maximum_bytes_billed, 1000)

def test_fill_from_default_conflict(self):
from google.cloud.bigquery import QueryJobConfig

basic_job_config = QueryJobConfig()
conflicting_job_config = self._make_one('conflicting_job_type')
self.assertNotEqual(
basic_job_config._job_type, conflicting_job_config._job_type)

with self.assertRaises(TypeError):
basic_job_config._fill_from_default(
conflicting_job_config)

@mock.patch('google.cloud.bigquery._helpers._get_sub_prop')
def test__get_sub_prop_wo_default(self, _get_sub_prop):
job_config = self._make_one()
Expand Down