Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to give default QueryJobConfig to Client #6088

Merged
merged 11 commits into from
Sep 26, 2018
23 changes: 21 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,13 @@ class Client(ClientWithProject):
"""The scopes required for authenticating as a BigQuery consumer."""

def __init__(
self, project=None, credentials=None, _http=None, location=None):
self, project=None, credentials=None, _http=None,
location=None, query_job_config=None):
blainehansen marked this conversation as resolved.
Show resolved Hide resolved
super(Client, self).__init__(
project=project, credentials=credentials, _http=_http)
self._connection = Connection(self)
self._location = location
self._default_query_job_config = query_job_config

@property
def location(self):
Expand Down Expand Up @@ -1187,7 +1189,9 @@ def extract_table(
return extract_job

def query(
self, query, job_config=None, job_id=None, job_id_prefix=None,
self, query,
job_config=None, override_job_config=False,

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

job_id=None, job_id_prefix=None,
location=None, project=None, retry=DEFAULT_RETRY):
"""Run a SQL query.

Expand Down Expand Up @@ -1226,6 +1230,21 @@ def query(
if location is None:
location = self.location

# if they don't want to override,
# we need to merge what they passed
if not override_job_config and self._default_query_job_config:
if job_config:
# anything that's not defined on the incoming
# that is in the default,
# should be filled in with the default
# the incoming therefore has precedence
job_config = job_config.fill_from_default(
self._default_query_job_config)
else:
job_config = self._default_query_job_config
# if they want to override (or if they never passed a default),
# we simply send whatever they sent

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(
job_ref, query, client=self, job_config=job_config)
Expand Down
26 changes: 26 additions & 0 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,32 @@ def to_api_repr(self):
"""
return copy.deepcopy(self._properties)

def fill_from_default(self, default_job_config):
"""Merge this job config with another one.
The other takes precedence with conflicting keys.
This is a naive one level merge.

:rtype: :class:`google.cloud.bigquery.job._JobConfig`
:returns: A new job config.
"""
if self._job_type != default_job_config._job_type:
raise TypeError(
"attempted to merge two incompatible job types: "
+ repr(self._job_type) + ', '
+ repr(default_job_config._job_type))

new_job_config = self.__class__()
new_job_config._properties = copy.deepcopy(self._properties)

self_job_properties = self._properties[self._job_type]
blainehansen marked this conversation as resolved.
Show resolved Hide resolved
new_job_properties = new_job_config._properties[self._job_type]
blainehansen marked this conversation as resolved.
Show resolved Hide resolved
default_job_properties = default_job_config._properties[self._job_type]
for key in list(default_job_properties.keys()):
blainehansen marked this conversation as resolved.
Show resolved Hide resolved
if not self_job_properties.get(key):
blainehansen marked this conversation as resolved.
Show resolved Hide resolved
new_job_properties[key] = default_job_properties[key]

return new_job_config

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a job configuration given its API representation
Expand Down
163 changes: 163 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ def test_ctor_w_location(self):
self.assertIs(client._connection.http, http)
self.assertEqual(client.location, location)

def test_ctor_w_query_job_config(self):
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import QueryJobConfig

creds = _make_credentials()
http = object()
location = 'us-central'
job_config = QueryJobConfig()
job_config.dry_run = True

client = self._make_one(project=self.PROJECT, credentials=creds,
_http=http, location=location,
query_job_config=job_config)
self.assertIsInstance(client._connection, Connection)
self.assertIs(client._connection.credentials, creds)
self.assertIs(client._connection.http, http)
self.assertEqual(client.location, location)

self.assertIsInstance(client._default_query_job_config, QueryJobConfig)
self.assertTrue(client._default_query_job_config.dry_run)

def test__get_query_results_miss_w_explicit_project_and_timeout(self):
from google.cloud.exceptions import NotFound

Expand Down Expand Up @@ -2707,6 +2728,148 @@ def test_query_w_explicit_project(self):
data=resource,
)

def test_query_w_explicit_job_config(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'defaultDataset': {
'projectId': self.PROJECT,
'datasetId': 'some-dataset',
},
'useLegacySql': False,
'useQueryCache': True,
'maximumBytesBilled': '2000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference
default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, 'some-dataset')
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds,
_http=http, query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000

# override_job_config
client.query(
query, job_id=job_id, location=self.LOCATION,
job_config=job_config)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_explicit_job_config_override(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'useLegacySql': False,
'useQueryCache': True,
'maximumBytesBilled': '2000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference
default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, 'some-dataset')
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds, _http=http,
query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000

# override_job_config
client.query(
query, job_id=job_id, location=self.LOCATION,
job_config=job_config, override_job_config=True)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_client_default_config_no_incoming(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'useLegacySql': False,
'maximumBytesBilled': '1000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig
default_job_config = QueryJobConfig()
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds, _http=http,
query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

client.query(
query, job_id=job_id, location=self.LOCATION)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_client_location(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
Expand Down
26 changes: 26 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,32 @@ def test_ctor(self):
self.assertEqual(job_config._job_type, self.JOB_TYPE)
self.assertEqual(job_config._properties, {self.JOB_TYPE: {}})

def test_fill_from_default(self):
from google.cloud.bigquery import QueryJobConfig

job_config = QueryJobConfig()
job_config.dry_run = True
job_config.maximum_bytes_billed = 1000

default_job_config = QueryJobConfig()
default_job_config.use_query_cache = True
default_job_config.maximum_bytes_billed = 2000

final_job_config = job_config.fill_from_default(default_job_config)
self.assertTrue(final_job_config.dry_run)
self.assertTrue(final_job_config.use_query_cache)
self.assertEqual(final_job_config.maximum_bytes_billed, 1000)

# case where job types differ
blainehansen marked this conversation as resolved.
Show resolved Hide resolved
basic_job_config = QueryJobConfig()
conflicting_job_config = self._make_one('conflicting_job_type')
self.assertNotEqual(
basic_job_config._job_type, conflicting_job_config._job_type)

with self.assertRaises(TypeError):
basic_job_config.fill_from_default(
conflicting_job_config)

@mock.patch('google.cloud.bigquery._helpers._get_sub_prop')
def test__get_sub_prop_wo_default(self, _get_sub_prop):
job_config = self._make_one()
Expand Down