diff --git a/bigquery/google/cloud/bigquery/_helpers.py b/bigquery/google/cloud/bigquery/_helpers.py index eb5161c9fe71..bcb9d0696bc3 100644 --- a/bigquery/google/cloud/bigquery/_helpers.py +++ b/bigquery/google/cloud/bigquery/_helpers.py @@ -658,3 +658,18 @@ def _build_resource_from_properties(obj, filter_fields): partial[filter_field] = obj._properties[filter_field] return partial + + +def _verify_job_config_type(job_config, expected_type, param_name="job_config"): + if not isinstance(job_config, expected_type): + msg = ( + "Expected an instance of {expected_type} class for the {param_name} parameter, " + "but received {param_name} = {job_config}" + ) + raise TypeError( + msg.format( + expected_type=expected_type.__name__, + param_name=param_name, + job_config=job_config, + ) + ) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 1ad107ba8151..e7810dbbd66a 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -53,6 +53,7 @@ from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none +from google.cloud.bigquery._helpers import _verify_job_config_type from google.cloud.bigquery._http import Connection from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.dataset import Dataset @@ -1355,6 +1356,11 @@ def load_table_from_uri( Returns: google.cloud.bigquery.job.LoadJob: A new load job. + + Raises: + TypeError: + If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` + class. """ job_id = _make_job_id(job_id, job_id_prefix) @@ -1370,6 +1376,10 @@ def load_table_from_uri( source_uris = [source_uris] destination = _table_arg_to_table_ref(destination, default_project=self.project) + + if job_config: + _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) + load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config) load_job._begin(retry=retry) @@ -1436,6 +1446,10 @@ def load_table_from_file( If ``size`` is not passed in and can not be determined, or if the ``file_obj`` can be detected to be a file opened in text mode. + + TypeError: + If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` + class. """ job_id = _make_job_id(job_id, job_id_prefix) @@ -1447,6 +1461,8 @@ def load_table_from_file( destination = _table_arg_to_table_ref(destination, default_project=self.project) job_ref = job._JobReference(job_id, project=project, location=location) + if job_config: + _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) load_job = job.LoadJob(job_ref, None, destination, self, job_config) job_resource = load_job.to_api_repr() @@ -1545,16 +1561,22 @@ def load_table_from_dataframe( If a usable parquet engine cannot be found. This method requires :mod:`pyarrow` or :mod:`fastparquet` to be installed. + TypeError: + If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` + class. """ job_id = _make_job_id(job_id, job_id_prefix) - if job_config is None: - job_config = job.LoadJobConfig() - else: + if job_config: + _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) # Make a copy so that the job config isn't modified in-place. job_config_properties = copy.deepcopy(job_config._properties) job_config = job.LoadJobConfig() job_config._properties = job_config_properties + + else: + job_config = job.LoadJobConfig() + job_config.source_format = job.SourceFormat.PARQUET if location is None: @@ -1700,14 +1722,21 @@ def load_table_from_json( Returns: google.cloud.bigquery.job.LoadJob: A new load job. + + Raises: + TypeError: + If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` + class. """ job_id = _make_job_id(job_id, job_id_prefix) - if job_config is None: - job_config = job.LoadJobConfig() - else: + if job_config: + _verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig) # Make a copy so that the job config isn't modified in-place. job_config = copy.deepcopy(job_config) + else: + job_config = job.LoadJobConfig() + job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON if job_config.schema is None: @@ -1900,6 +1929,11 @@ def copy_table( Returns: google.cloud.bigquery.job.CopyJob: A new copy job instance. + + Raises: + TypeError: + If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.CopyJobConfig` + class. """ job_id = _make_job_id(job_id, job_id_prefix) @@ -1928,6 +1962,8 @@ def copy_table( destination = _table_arg_to_table_ref(destination, default_project=self.project) + if job_config: + _verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig) copy_job = job.CopyJob( job_ref, sources, destination, client=self, job_config=job_config ) @@ -1985,6 +2021,11 @@ def extract_table( Returns: google.cloud.bigquery.job.ExtractJob: A new extract job instance. + + Raises: + TypeError: + If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.ExtractJobConfig` + class. """ job_id = _make_job_id(job_id, job_id_prefix) @@ -2000,6 +2041,10 @@ def extract_table( if isinstance(destination_uris, six.string_types): destination_uris = [destination_uris] + if job_config: + _verify_job_config_type( + job_config, google.cloud.bigquery.job.ExtractJobConfig + ) extract_job = job.ExtractJob( job_ref, source, destination_uris, client=self, job_config=job_config ) @@ -2049,6 +2094,11 @@ def query( Returns: google.cloud.bigquery.job.QueryJob: A new query job instance. + + Raises: + TypeError: + If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig` + class. """ job_id = _make_job_id(job_id, job_id_prefix) @@ -2060,6 +2110,9 @@ def query( if self._default_query_job_config: if job_config: + _verify_job_config_type( + job_config, google.cloud.bigquery.job.QueryJobConfig + ) # anything that's not defined on the incoming # that is in the default, # should be filled in with the default @@ -2068,6 +2121,10 @@ def query( self._default_query_job_config ) else: + _verify_job_config_type( + self._default_query_job_config, + google.cloud.bigquery.job.QueryJobConfig, + ) job_config = self._default_query_job_config job_ref = job._JobReference(job_id, project=project, location=location) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index b8a367e17cb9..91b9bc642187 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -2973,7 +2973,7 @@ def test_list_jobs_w_parent_job_filter(self): conn.api_request.reset_mock() def test_load_table_from_uri(self): - from google.cloud.bigquery.job import LoadJob + from google.cloud.bigquery.job import LoadJob, LoadJobConfig JOB = "job_name" DESTINATION = "destination_table" @@ -2993,11 +2993,14 @@ def test_load_table_from_uri(self): } creds = _make_credentials() http = object() + job_config = LoadJobConfig() client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) conn = client._connection = make_connection(RESOURCE) destination = client.dataset(self.DS_ID).table(DESTINATION) - job = client.load_table_from_uri(SOURCE_URI, destination, job_id=JOB) + job = client.load_table_from_uri( + SOURCE_URI, destination, job_id=JOB, job_config=job_config + ) # Check that load_table_from_uri actually starts the job. conn.api_request.assert_called_once_with( @@ -3005,6 +3008,7 @@ def test_load_table_from_uri(self): ) self.assertIsInstance(job, LoadJob) + self.assertIsInstance(job._configuration, LoadJobConfig) self.assertIs(job._client, client) self.assertEqual(job.job_id, JOB) self.assertEqual(list(job.source_uris), [SOURCE_URI]) @@ -3100,6 +3104,26 @@ def test_load_table_from_uri_w_client_location(self): method="POST", path="/projects/other-project/jobs", data=resource ) + def test_load_table_from_uri_w_invalid_job_config(self): + from google.cloud.bigquery import job + + JOB = "job_name" + DESTINATION = "destination_table" + SOURCE_URI = "http://example.com/source.csv" + + creds = _make_credentials() + http = object() + job_config = job.CopyJobConfig() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + destination = client.dataset(self.DS_ID).table(DESTINATION) + + with self.assertRaises(TypeError) as exc: + client.load_table_from_uri( + SOURCE_URI, destination, job_id=JOB, job_config=job_config + ) + + self.assertIn("Expected an instance of LoadJobConfig", exc.exception.args[0]) + @staticmethod def _mock_requests_response(status_code, headers, content=b""): return mock.Mock( @@ -3422,6 +3446,66 @@ def test_copy_table_w_source_strings(self): ).table("destination_table") self.assertEqual(job.destination, expected_destination) + def test_copy_table_w_invalid_job_config(self): + from google.cloud.bigquery import job + + JOB = "job_name" + SOURCE = "source_table" + DESTINATION = "destination_table" + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + job_config = job.ExtractJobConfig() + dataset = client.dataset(self.DS_ID) + source = dataset.table(SOURCE) + destination = dataset.table(DESTINATION) + with self.assertRaises(TypeError) as exc: + client.copy_table(source, destination, job_id=JOB, job_config=job_config) + + self.assertIn("Expected an instance of CopyJobConfig", exc.exception.args[0]) + + def test_copy_table_w_valid_job_config(self): + from google.cloud.bigquery.job import CopyJobConfig + + JOB = "job_name" + SOURCE = "source_table" + DESTINATION = "destination_table" + RESOURCE = { + "jobReference": {"projectId": self.PROJECT, "jobId": JOB}, + "configuration": { + "copy": { + "sourceTables": [ + { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": SOURCE, + } + ], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": DESTINATION, + }, + } + }, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + job_config = CopyJobConfig() + conn = client._connection = make_connection(RESOURCE) + dataset = client.dataset(self.DS_ID) + source = dataset.table(SOURCE) + destination = dataset.table(DESTINATION) + + job = client.copy_table(source, destination, job_id=JOB, job_config=job_config) + # Check that copy_table actually starts the job. + conn.api_request.assert_called_once_with( + method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE + ) + self.assertIsInstance(job._configuration, CopyJobConfig) + def test_extract_table(self): from google.cloud.bigquery.job import ExtractJob @@ -3462,6 +3546,24 @@ def test_extract_table(self): self.assertEqual(job.source, source) self.assertEqual(list(job.destination_uris), [DESTINATION]) + def test_extract_table_w_invalid_job_config(self): + from google.cloud.bigquery import job + + JOB = "job_id" + SOURCE = "source_table" + DESTINATION = "gs://bucket_name/object_name" + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + dataset = client.dataset(self.DS_ID) + source = dataset.table(SOURCE) + job_config = job.LoadJobConfig() + with self.assertRaises(TypeError) as exc: + client.extract_table(source, DESTINATION, job_id=JOB, job_config=job_config) + + self.assertIn("Expected an instance of ExtractJobConfig", exc.exception.args[0]) + def test_extract_table_w_explicit_project(self): job_id = "job_id" source_id = "source_table" @@ -3745,6 +3847,35 @@ def test_query_w_explicit_job_config(self): method="POST", path="/projects/PROJECT/jobs", data=resource ) + def test_query_w_invalid_job_config(self): + from google.cloud.bigquery import QueryJobConfig, DatasetReference + from google.cloud.bigquery import job + + job_id = "some-job-id" + query = "select count(*) from persons" + creds = _make_credentials() + http = object() + default_job_config = QueryJobConfig() + default_job_config.default_dataset = DatasetReference( + self.PROJECT, "some-dataset" + ) + default_job_config.maximum_bytes_billed = 1000 + + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_query_job_config=default_job_config, + ) + + job_config = job.LoadJobConfig() + + with self.assertRaises(TypeError) as exc: + client.query( + query, job_id=job_id, location=self.LOCATION, job_config=job_config + ) + self.assertIn("Expected an instance of QueryJobConfig", exc.exception.args[0]) + def test_query_w_explicit_job_config_override(self): job_id = "some-job-id" query = "select count(*) from persons" @@ -3839,6 +3970,23 @@ def test_query_w_client_default_config_no_incoming(self): method="POST", path="/projects/PROJECT/jobs", data=resource ) + def test_query_w_invalid_default_job_config(self): + job_id = "some-job-id" + query = "select count(*) from persons" + creds = _make_credentials() + http = object() + default_job_config = object() + client = self._make_one( + project=self.PROJECT, + credentials=creds, + _http=http, + default_query_job_config=default_job_config, + ) + + with self.assertRaises(TypeError) as exc: + client.query(query, job_id=job_id, location=self.LOCATION) + self.assertIn("Expected an instance of QueryJobConfig", exc.exception.args[0]) + def test_query_w_client_location(self): job_id = "some-job-id" query = "select count(*) from persons" @@ -5419,6 +5567,19 @@ def test_load_table_from_file_bad_mode(self): with pytest.raises(ValueError): client.load_table_from_file(file_obj, self.TABLE_REF) + def test_load_table_from_file_w_invalid_job_config(self): + from google.cloud.bigquery import job + + client = self._make_client() + gzip_file = self._make_gzip_file_obj(writable=True) + config = job.QueryJobConfig() + with pytest.raises(TypeError) as exc: + client.load_table_from_file( + gzip_file, self.TABLE_REF, job_id="job_id", job_config=config + ) + err_msg = str(exc.value) + assert "Expected an instance of LoadJobConfig" in err_msg + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe(self): @@ -6118,6 +6279,24 @@ def test_load_table_from_dataframe_w_nulls(self): assert sent_config.schema == schema assert sent_config.source_format == job.SourceFormat.PARQUET + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_w_invaild_job_config(self): + from google.cloud.bigquery import job + + client = self._make_client() + + records = [{"float_column": 3.14, "struct_column": [{"foo": 1}, {"bar": -1}]}] + dataframe = pandas.DataFrame(data=records) + job_config = job.CopyJobConfig() + + with pytest.raises(TypeError) as exc: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION + ) + + err_msg = str(exc.value) + assert "Expected an instance of LoadJobConfig" in err_msg + def test_load_table_from_json_basic_use(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES from google.cloud.bigquery import job @@ -6206,6 +6385,26 @@ def test_load_table_from_json_non_default_args(self): # all properties should have been cloned and sent to the backend assert sent_config._properties.get("load", {}).get("unknown_field") == "foobar" + def test_load_table_from_json_w_invalid_job_config(self): + from google.cloud.bigquery import job + + client = self._make_client() + json_rows = [ + {"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False}, + {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, + ] + job_config = job.CopyJobConfig() + with pytest.raises(TypeError) as exc: + client.load_table_from_json( + json_rows, + self.TABLE_REF, + job_config=job_config, + project="project-x", + location="EU", + ) + err_msg = str(exc.value) + assert "Expected an instance of LoadJobConfig" in err_msg + # Low-level tests @classmethod