Skip to content

Commit

Permalink
refactor(bigquery): update code samples of load table file and uri (#…
Browse files Browse the repository at this point in the history
…10175)

* refactor(bigquery): update code samples of load table file and uri

* refactor(bigquery): add uri for load orc and avro data

* refactor(bigquery): fix lint and docs

* refactor(bigquery): update copyright to 2020
  • Loading branch information
HemangChothani authored Feb 3, 2020
1 parent 1ebe370 commit 10dd74a
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 268 deletions.
263 changes: 0 additions & 263 deletions bigquery/docs/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,269 +581,6 @@ def test_manage_views(client, to_delete):
# [END bigquery_grant_view_access]


def test_load_table_from_file(client, to_delete):
"""Upload table data from a CSV file."""
dataset_id = "load_table_from_file_dataset_{}".format(_millis())
table_id = "load_table_from_file_table_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
dataset.location = "US"
client.create_dataset(dataset)
to_delete.append(dataset)
snippets_dir = os.path.abspath(os.path.dirname(__file__))
filename = os.path.join(
snippets_dir, "..", "..", "bigquery", "tests", "data", "people.csv"
)

# [START bigquery_load_from_file]
# from google.cloud import bigquery
# client = bigquery.Client()
# filename = '/path/to/file.csv'
# dataset_id = 'my_dataset'
# table_id = 'my_table'

dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.autodetect = True

with open(filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

job.result() # Waits for table load to complete.

print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))
# [END bigquery_load_from_file]

table = client.get_table(table_ref)
rows = list(client.list_rows(table)) # API request

assert len(rows) == 2
# Order is not preserved, so compare individually
row1 = bigquery.Row(("Wylma Phlyntstone", 29), {"full_name": 0, "age": 1})
assert row1 in rows
row2 = bigquery.Row(("Phred Phlyntstone", 32), {"full_name": 0, "age": 1})
assert row2 in rows


def test_load_table_from_uri_avro(client, to_delete, capsys):
dataset_id = "load_table_from_uri_avro_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_load_table_gcs_avro]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.AVRO
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.avro"

load_job = client.load_table_from_uri(
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_avro]

out, _ = capsys.readouterr()
assert "Loaded 50 rows." in out


def test_load_table_from_uri_csv(client, to_delete, capsys):
dataset_id = "load_table_from_uri_csv_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_load_table_gcs_csv]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
]
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"

load_job = client.load_table_from_uri(
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_csv]

out, _ = capsys.readouterr()
assert "Loaded 50 rows." in out


def test_load_table_from_uri_json(client, to_delete, capsys):
dataset_id = "load_table_from_uri_json_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
dataset.location = "US"
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_load_table_gcs_json]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
]
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.json"

load_job = client.load_table_from_uri(
uri,
dataset_ref.table("us_states"),
location="US", # Location must match that of the destination dataset.
job_config=job_config,
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_json]

out, _ = capsys.readouterr()
assert "Loaded 50 rows." in out


def test_load_table_from_uri_cmek(client, to_delete):
dataset_id = "load_table_from_uri_cmek_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
dataset.location = "US"
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_load_table_gcs_json_cmek]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON

# Set the encryption key to use for the destination.
# TODO: Replace this key with a key you have created in KMS.
kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
"cloud-samples-tests", "us", "test", "test"
)
encryption_config = bigquery.EncryptionConfiguration(kms_key_name=kms_key_name)
job_config.destination_encryption_configuration = encryption_config
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.json"

load_job = client.load_table_from_uri(
uri,
dataset_ref.table("us_states"),
location="US", # Location must match that of the destination dataset.
job_config=job_config,
) # API request

assert load_job.job_type == "load"

load_job.result() # Waits for table load to complete.

assert load_job.state == "DONE"
table = client.get_table(dataset_ref.table("us_states"))
assert table.encryption_configuration.kms_key_name == kms_key_name
# [END bigquery_load_table_gcs_json_cmek]


def test_load_table_from_uri_parquet(client, to_delete, capsys):
dataset_id = "load_table_from_uri_parquet_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_load_table_gcs_parquet]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"

load_job = client.load_table_from_uri(
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_parquet]

out, _ = capsys.readouterr()
assert "Loaded 50 rows." in out


def test_load_table_from_uri_orc(client, to_delete, capsys):
dataset_id = "load_table_from_uri_orc_{}".format(_millis())
dataset = bigquery.Dataset(client.dataset(dataset_id))
client.create_dataset(dataset)
to_delete.append(dataset)

# [START bigquery_load_table_gcs_orc]
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.ORC
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.orc"

load_job = client.load_table_from_uri(
uri, dataset_ref.table("us_states"), job_config=job_config
) # API request
print("Starting job {}".format(load_job.job_id))

load_job.result() # Waits for table load to complete.
print("Job finished.")

destination_table = client.get_table(dataset_ref.table("us_states"))
print("Loaded {} rows.".format(destination_table.num_rows))
# [END bigquery_load_table_gcs_orc]

out, _ = capsys.readouterr()
assert "Loaded 50 rows." in out


def test_load_table_from_uri_autodetect(client, to_delete, capsys):
"""Load table from a GCS URI using various formats and auto-detected schema
Each file format has its own tested load from URI sample. Because most of
Expand Down
2 changes: 1 addition & 1 deletion bigquery/docs/usage/encryption.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Change the key used to encrypt a table.
Load a file from Cloud Storage, using a customer-managed encryption key from
Cloud KMS for the destination table.

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/load_table_uri_cmek.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_json_cmek]
Expand Down
30 changes: 26 additions & 4 deletions bigquery/docs/usage/tables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Create an integer range partitioned table with the
Load table data from a file with the
:func:`~google.cloud.bigquery.client.Client.load_table_from_file` method:

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/load_table_file.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_from_file]
Expand All @@ -79,7 +79,7 @@ Load table data from a file with the
Load a CSV file from Cloud Storage with the
:func:`~google.cloud.bigquery.client.Client.load_table_from_uri` method:

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/load_table_uri_csv.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_csv]
Expand All @@ -90,7 +90,7 @@ See also: `Loading CSV data from Cloud Storage

Load a JSON file from Cloud Storage:

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/load_table_uri_json.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_json]
Expand All @@ -101,7 +101,7 @@ See also: `Loading JSON data from Cloud Storage

Load a Parquet file from Cloud Storage:

.. literalinclude:: ../snippets.py
.. literalinclude:: ../samples/load_table_uri_parquet.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_parquet]
Expand All @@ -110,6 +110,28 @@ Load a Parquet file from Cloud Storage:
See also: `Loading Parquet data from Cloud Storage
<https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet>`_.

Load an Avro file from Cloud Storage:

.. literalinclude:: ../samples/load_table_uri_avro.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_avro]
:end-before: [END bigquery_load_table_gcs_avro]

See also: `Loading Avro data from Cloud Storage
<https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro>`_.

Load an ORC file from Cloud Storage:

.. literalinclude:: ../samples/load_table_uri_orc.py
:language: python
:dedent: 4
:start-after: [START bigquery_load_table_gcs_orc]
:end-before: [END bigquery_load_table_gcs_orc]

See also: `Loading ORC data from Cloud Storage
<https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-orc>`_.

Updating a Table
^^^^^^^^^^^^^^^^

Expand Down
43 changes: 43 additions & 0 deletions bigquery/samples/load_table_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def load_table_file(file_path, table_id):

# [START bigquery_load_from_file]
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True,
)

with open(file_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)

job.result() # Waits for the job to complete.

table = client.get_table(table_id) # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)
# [END bigquery_load_from_file]
return table
Loading

0 comments on commit 10dd74a

Please sign in to comment.