Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation and types to ReadFrom/WriteToBigQuery. #31537

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,28 @@ def write_to_text(pcoll, path: str):


def read_from_bigquery(
query=None, table=None, row_restriction=None, fields=None):
*,
table: Optional[str] = None,
query: Optional[str] = None,
row_restriction: Optional[str] = None,
fields: Optional[Iterable[str]] = None):
"""Reads data from BigQuery.

Exactly one of table or query must be set.
If query is set, neither row_restriction nor fields should be set.

Args:
table (str): The table to read from, specified as `DATASET.TABLE`
or `PROJECT:DATASET.TABLE`.
query (str): A query to be used instead of the table argument.
row_restriction (str): Optional SQL text filtering statement, similar to a
WHERE clause in a query. Aggregates are not supported. Restricted to a
maximum length for 1 MB.
selected_fields (List[str]): Optional List of names of the fields in the
table that should be read. If empty, all fields will be read. If the
specified field is a nested field, all the sub-fields in the field will be
selected. The output field order is unrelated to the order of fields
given here.
"""
if query is None:
assert table is not None
Expand All @@ -114,12 +131,43 @@ def read_from_bigquery(


def write_to_bigquery(
table,
table: Optional[str],
robertwb marked this conversation as resolved.
Show resolved Hide resolved
*,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition: str = BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition: str = BigQueryDisposition.WRITE_APPEND,
robertwb marked this conversation as resolved.
Show resolved Hide resolved
error_handling=None):
"""Writes data to a BigQuery table."""
f"""Writes data to a BigQuery table.

Args:
table (str): The table to read from, specified as `DATASET.TABLE`
or `PROJECT:DATASET.TABLE`.
create_disposition (BigQueryDisposition): A string describing what
happens if the table does not exist. Possible values are:

* :attr:`{BigQueryDisposition.CREATE_IF_NEEDED}`: create if does not
exist.
* :attr:`{BigQueryDisposition.CREATE_NEVER}`: fail the write if does not
exist.

Defaults to `{BigQueryDisposition.CREATE_IF_NEEDED}`.

write_disposition (BigQueryDisposition): A string describing what happens
if the table has already some data. Possible values are:

* :attr:`{BigQueryDisposition.WRITE_TRUNCATE}`: delete existing rows.
* :attr:`{BigQueryDisposition.WRITE_APPEND}`: add to existing rows.
* :attr:`{BigQueryDisposition.WRITE_EMPTY}`: fail the write if table not
empty.

For streaming pipelines WriteTruncate can not be used.

Defaults to `{BigQueryDisposition.WRITE_APPEND}`.

error_handling: If specified, should be a mapping giving an output into
which to emit records that failed to bet written to BigQuery, as
described at https://beam.apache.org/documentation/sdks/yaml-errors/
Otherwise permanently failing records will cause pipeline failure.
"""
class WriteToBigQueryHandlingErrors(beam.PTransform):
def default_label(self):
return 'WriteToBigQuery'
Expand Down
Loading