Skip to content

Commit

Permalink
[YAML] Add and cleanup documentation for several builtin transforms. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Dec 9, 2023
1 parent 98a2690 commit 4891a81
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public interface SchemaTransformProvider {
/** Returns an id that uniquely represents this transform. */
String identifier();

/** Returns a description of this transform to be used for documentation. */
default String description() {
return "";
}

/**
* Returns the expected schema of the configuration object. Note this is distinct from the schema
* of the transform itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ DiscoverSchemaTransformResponse discover(DiscoverSchemaTransformRequest request)
transformProvider.getAllProviders()) {
SchemaTransformConfig.Builder schemaTransformConfigBuilder =
SchemaTransformConfig.newBuilder();
schemaTransformConfigBuilder.setDescription(provider.description());
schemaTransformConfigBuilder.setConfigSchema(
SchemaTranslation.schemaToProto(provider.configurationSchema(), true));
schemaTransformConfigBuilder.addAllInputPcollectionNames(provider.inputCollectionNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ public String identifier() {
return "schematransform:org.apache.beam:sql_transform:v1";
}

@Override
public String description() {
return "A transform that executes a SQL query on its input PCollections.\n\n"
+ "If a single input is given, it may be referred to as `PCOLLECTION`, e.g. the query could be of the form"
+ "\n\n"
+ " SELECT a, sum(b) FROM PCOLLECTION"
+ "\n\n"
+ "If multiple inputs are given, the should be named as they are in the query, e.g."
+ "\n\n"
+ " SELECT a, b, c FROM pcoll_1 join pcoll_2 using (b)"
+ "\n\n"
+ "For more details about Beam SQL in general see "
+ "[the Beam SQL documentation](https://beam.apache.org/documentation/dsls/sql/overview/).";
}

@Override
public Schema configurationSchema() {
List<String> providers = new ArrayList<>();
Expand All @@ -82,7 +97,7 @@ public Schema configurationSchema() {
EnumerationType providerEnum = EnumerationType.create(providers);

return Schema.of(
Schema.Field.of("query", Schema.FieldType.STRING),
Schema.Field.of("query", Schema.FieldType.STRING).withDescription("SQL query to execute"),
Schema.Field.nullable(
"ddl", Schema.FieldType.STRING), // TODO: Underlying builder seems more capable?
Schema.Field.nullable("dialect", Schema.FieldType.logicalType(QUERY_ENUMERATION)),
Expand Down
22 changes: 20 additions & 2 deletions sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from apache_beam.portability.api import beam_expansion_api_pb2_grpc
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import external_transforms_pb2
from apache_beam.portability.api import schema_pb2
from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import artifact_service
from apache_beam.transforms import environments
Expand All @@ -51,6 +52,7 @@
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.schemas import named_tuple_from_schema
from apache_beam.typehints.schemas import named_tuple_to_schema
from apache_beam.typehints.schemas import typing_from_runner_api
from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.typehints.typehints import Union
from apache_beam.typehints.typehints import UnionConstraint
Expand Down Expand Up @@ -450,8 +452,24 @@ def discover_iter(expansion_service, ignore_errors=True):
schema = named_tuple_from_schema(proto_config.config_schema)
except Exception as exn:
if ignore_errors:
logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
continue
truncated_schema = schema_pb2.Schema()
truncated_schema.CopyFrom(proto_config.config_schema)
for field in truncated_schema.fields:
try:
typing_from_runner_api(field.type)
except Exception:
if field.type.nullable:
# Set it to an empty placeholder type.
field.type.CopyFrom(
schema_pb2.FieldType(
nullable=True,
row_type=schema_pb2.RowType(
schema=schema_pb2.Schema())))
try:
schema = named_tuple_from_schema(truncated_schema)
except Exception as exn:
logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
continue
else:
raise

Expand Down
14 changes: 5 additions & 9 deletions sdks/python/apache_beam/yaml/standard_providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,18 @@
# TODO(robertwb): Add more providers.
# TODO(robertwb): Perhaps auto-generate this file?

- type: 'beamJar'
config:
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'
version: BEAM_VERSION
transforms:
Sql: 'beam:external:java:sql:v1'
MapToFields-java: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
MapToFields-generic: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"

- type: renaming
transforms:
'Sql': 'Sql'
'MapToFields-java': 'MapToFields-java'
'MapToFields-generic': 'MapToFields-java'
'Filter-java': 'Filter-java'
'Explode': 'Explode'
config:
mappings:
'Sql':
query: 'query'
# Unfortunately dialect is a java logical type.
'MapToFields-generic':
language: 'language'
append: 'append'
Expand All @@ -57,6 +52,7 @@
underlying_provider:
type: beamJar
transforms:
Sql: "schematransform:org.apache.beam:sql_transform:v1"
MapToFields-java: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"
Filter-java: "beam:schematransform:org.apache.beam:yaml:filter-java:v1"
Explode: "beam:schematransform:org.apache.beam:yaml:explode:v1"
Expand Down
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,28 @@ def expand(pcoll, error_handling=None, **kwargs):


class _Explode(beam.PTransform):
"""Explodes (aka unnest/flatten) one or more fields producing multiple rows.
Given one or more fields of iterable type, produces multiple rows, one for
each value of that field. For example, a row of the form `('a', [1, 2, 3])`
would expand to `('a', 1)`, `('a', 2')`, and `('a', 3)` when exploded on
the second field.
This is akin to a `FlatMap` when paired with the MapToFields transform.
Args:
fields: The list of fields to expand.
cross_product: If multiple fields are specified, indicates whether the
full cross-product of combinations should be produced, or if the
first element of the first field corresponds to the first element
of the second field, etc. For example, the row
`(['a', 'b'], [1, 2])` would expand to the four rows
`('a', 1)`, `('a', 2)`, `('b', 1)`, and `('b', 2)` when
`cross_product` is set to `true` but only the two rows
`('a', 1)` and `('b', 2)` when it is set to `false`.
Only meaningful (and required) if multiple rows are specified.
error_handling: Whether and how to handle errors during iteration.
"""
def __init__(
self,
fields: Union[str, Collection[str]],
Expand Down
90 changes: 80 additions & 10 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def dicts_to_rows(o):


def create_builtin_provider():
def create(elements: Iterable[Any], reshuffle: bool = True):
def create(elements: Iterable[Any], reshuffle: Optional[bool] = True):
"""Creates a collection containing a specified set of elements.
YAML/JSON-style mappings will be interpreted as Beam rows. For example::
Expand All @@ -560,17 +560,48 @@ def create(elements: Iterable[Any], reshuffle: bool = True):
Args:
elements: The set of elements that should belong to the PCollection.
YAML/JSON-style mappings will be interpreted as Beam rows.
reshuffle (optional): Whether to introduce a reshuffle if there is more
than one element in the collection. Defaults to True.
reshuffle (optional): Whether to introduce a reshuffle (to possibly
redistribute the work) if there is more than one element in the
collection. Defaults to True.
"""
return beam.Create([element_to_rows(e) for e in elements], reshuffle)
return beam.Create([element_to_rows(e) for e in elements],
reshuffle=reshuffle is not False)

# Or should this be posargs, args?
# pylint: disable=dangerous-default-value
def fully_qualified_named_transform(
constructor: str,
args: Iterable[Any] = (),
kwargs: Mapping[str, Any] = {}):
args: Optional[Iterable[Any]] = (),
kwargs: Optional[Mapping[str, Any]] = {}):
"""A Python PTransform identified by fully qualified name.
This allows one to import, construct, and apply any Beam Python transform.
This can be useful for using transforms that have not yet been exposed
via a YAML interface. Note, however, that conversion may be required if this
transform does not accept or produce Beam Rows.
For example,
type: PyTransform
config:
constructor: apache_beam.pkg.mod.SomeClass
args: [1, 'foo']
kwargs:
baz: 3
can be used to access the transform
`apache_beam.pkg.mod.SomeClass(1, 'foo', baz=3)`.
Args:
constructor: Fully qualified name of a callable used to construct the
transform. Often this is a class such as
`apache_beam.pkg.mod.SomeClass` but it can also be a function or
any other callable that returns a PTransform.
args: A list of parameters to pass to the callable as positional
arguments.
kwargs: A list of parameters to pass to the callable as keyword
arguments.
"""
with FullyQualifiedNamedTransform.with_filter('*'):
return constructor >> FullyQualifiedNamedTransform(
constructor, args, kwargs)
Expand All @@ -579,6 +610,19 @@ def fully_qualified_named_transform(
# exactly zero or one PCollection in yaml (as they would be interpreted as
# PBegin and the PCollection itself respectively).
class Flatten(beam.PTransform):
"""Flattens multiple PCollections into a single PCollection.
The elements of the resulting PCollection will be the (disjoint) union of
all the elements of all the inputs.
Note that in YAML transforms can always take a list of inputs which will
be implicitly flattened.
"""
def __init__(self):
# Suppress the "label" argument from the superclass for better docs.
# pylint: disable=useless-parent-delegation
super().__init__()

def expand(self, pcolls):
if isinstance(pcolls, beam.PCollection):
pipeline_arg = {}
Expand All @@ -592,6 +636,24 @@ def expand(self, pcolls):
return pcolls | beam.Flatten(**pipeline_arg)

class WindowInto(beam.PTransform):
# pylint: disable=line-too-long

"""A window transform assigning windows to each element of a PCollection.
The assigned windows will affect all downstream aggregating operations,
which will aggregate by window as well as by key.
See [the Beam documentation on windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
for more details.
Note that any Yaml transform can have a
[windowing parameter](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md#windowing),
which is applied to its inputs (if any) or outputs (if there are no inputs)
which means that explicit WindowInto operations are not typically needed.
Args:
windowing: the type and parameters of the windowing to perform
"""
def __init__(self, windowing):
self._window_transform = self._parse_window_spec(windowing)

Expand All @@ -617,13 +679,21 @@ def _parse_window_spec(spec):
# TODO: Triggering, etc.
return beam.WindowInto(window_fn)

def log_and_return(x):
logging.info(x)
return x
def LogForTesting():
"""Logs each element of its input PCollection.
The output of this transform is a copy of its input for ease of use in
chain-style pipelines.
"""
def log_and_return(x):
logging.info(x)
return x

return beam.Map(log_and_return)

return InlineProvider({
'Create': create,
'LogForTesting': lambda: beam.Map(log_and_return),
'LogForTesting': LogForTesting,
'PyTransform': fully_qualified_named_transform,
'Flatten': Flatten,
'WindowInto': WindowInto,
Expand Down

0 comments on commit 4891a81

Please sign in to comment.