Skip to content

Commit

Permalink
add ExternalTransformProvider example (#30666)
Browse files Browse the repository at this point in the history
* add ExternalTransformProvider example

* cleanup old command

* clarify docs

* update

* indicate JavaExternalTransform is outdated
  • Loading branch information
ahmedabu98 authored Aug 29, 2024
1 parent df83fbd commit 2ac5594
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 0 deletions.
3 changes: 3 additions & 0 deletions examples/multi-language/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ This project provides examples of Apache Beam

## Using Java transforms from Python

* **python/wordcount_external** - A Python pipeline that runs the Word Count workflow using three external Java
SchemaTransforms. This example demonstrates the updated `ExternalTransformProvider` API.
#### _Outdated examples:_
* **python/addprefix** - A Python pipeline that reads a text file and attaches a prefix on the Java side to each input.
* **python/javacount** - A Python pipeline that counts words using the Java `Count.perElement()` transform.
* **python/javadatagenerator** - A Python pipeline that produces a set of strings generated from Java.
Expand Down
118 changes: 118 additions & 0 deletions examples/multi-language/python/wordcount_external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.typehints.row_type import RowTypeConstraint
"""A Python multi-language pipeline that counts words using multiple Java SchemaTransforms.
This pipeline reads an input text file then extracts the words, counts them, and writes the results Java
SchemaTransforms. The transforms are listed below and can be found in
src/main/java/org/apache/beam/examples/schematransforms/:
- `ExtractWordsProvider`
- `JavaCountProvider`
- `WriteWordsProvider`
These Java transforms are accessible to the Python pipeline via an expansion service. Check out the
[`README.md`](https://github.com/apache/beam/blob/master/examples/multi-language/README.md#1-start-the-expansion-service)
for instructions on how to download the jar and run this expansion service.
This example aims to demonstrate how to use the `ExternalTransformProvider` utility, which dynamically generates and
provides user-friendly wrappers for external transforms.
Example commands for executing this program:
DirectRunner:
$ python wordcount_external.py \
--runner DirectRunner \
--input <INPUT FILE> \
--output <OUTPUT FILE> \
--expansion_service_port <PORT>
DataflowRunner:
$ python wordcount_external.py \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $GCP_PROJECT \
--region $GCP_REGION \
--job_name $JOB_NAME \
--num_workers $NUM_WORKERS \
--input "gs://dataflow-samples/shakespeare/kinglear.txt" \
--output "gs://$GCS_BUCKET/wordcount_external/output" \
--expansion_service_port <PORT>
"""

# Original Java transform is in ExtractWordsProvider.java
EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
# Original Java transform is in JavaCountProvider.java
COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
# Original Java transform is in WriteWordsProvider.java
WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"


def run(input_path, output_path, expansion_service_port, pipeline_args):
pipeline_options = PipelineOptions(pipeline_args)

# Discover and get external transforms from this expansion service
provider = ExternalTransformProvider("localhost:" + expansion_service_port)
# Get transforms with identifiers, then use them as you would a regular
# native PTransform
Extract = provider.get_urn(EXTRACT_IDENTIFIER)
Count = provider.get_urn(COUNT_IDENTIFIER)
Write = provider.get_urn(WRITE_IDENTIFIER)

with beam.Pipeline(options=pipeline_options) as p:
lines = p | 'Read' >> ReadFromText(input_path)

words = (lines
| 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
| 'Extract Words' >> Extract())
word_counts = words | 'Count Words' >> Count()
formatted_words = (
word_counts
| 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
row.word, row.count))).with_output_types(
RowTypeConstraint.from_fields([('line', str)])))

formatted_words | 'Write' >> Write(file_path_prefix=output_path)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
required=True,
help='Input file')
parser.add_argument('--output',
dest='output',
required=True,
help='Output file')
parser.add_argument('--expansion_service_port',
dest='expansion_service_port',
required=True,
help='Expansion service port')
known_args, pipeline_args = parser.parse_known_args()

run(known_args.input, known_args.output, known_args.expansion_service_port,
pipeline_args)
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.multilanguage.schematransforms;

import static org.apache.beam.examples.multilanguage.schematransforms.ExtractWordsProvider.Configuration;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;

/** Splits a line into separate words and returns each word. */
@AutoService(SchemaTransformProvider.class)
public class ExtractWordsProvider extends TypedSchemaTransformProvider<Configuration> {
public static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build();

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:extract_words:v1";
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(
"output",
input.get("input").apply(ParDo.of(new ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
}
};
}

static class ExtractWordsFn extends DoFn<Row, Row> {
@ProcessElement
public void processElement(@Element Row element, OutputReceiver<Row> receiver) {
// Split the line into words.
String line = Preconditions.checkStateNotNull(element.getString("line"));
String[] words = line.split("[^\\p{L}]+", -1);

for (String word : words) {
if (!word.isEmpty()) {
receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", word).build());
}
}
}
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
protected abstract static class Configuration {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.multilanguage.schematransforms;

import static org.apache.beam.examples.multilanguage.schematransforms.JavaCountProvider.Configuration;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

@AutoService(SchemaTransformProvider.class)
public class JavaCountProvider extends TypedSchemaTransformProvider<Configuration> {
@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:count:v1";
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
Schema outputSchema =
Schema.builder().addStringField("word").addInt64Field("count").build();

PCollection<Row> wordCounts =
input
.get("input")
.apply(Count.perElement())
.apply(
MapElements.into(TypeDescriptors.rows())
.via(
kv ->
Row.withSchema(outputSchema)
.withFieldValue(
"word",
Preconditions.checkStateNotNull(
kv.getKey().getString("word")))
.withFieldValue("count", kv.getValue())
.build()))
.setRowSchema(outputSchema);

return PCollectionRowTuple.of("output", wordCounts);
}
};
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
protected abstract static class Configuration {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.multilanguage.schematransforms;

import static org.apache.beam.examples.multilanguage.schematransforms.WriteWordsProvider.Configuration;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.TypeDescriptors;

@AutoService(SchemaTransformProvider.class)
public class WriteWordsProvider extends TypedSchemaTransformProvider<Configuration> {
@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:write_words:v1";
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
input
.get("input")
.apply(
MapElements.into(TypeDescriptors.strings())
.via(row -> Preconditions.checkStateNotNull(row.getString("line"))))
.apply(TextIO.write().to(configuration.getFilePathPrefix()));

return PCollectionRowTuple.empty(input.getPipeline());
}
};
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
protected abstract static class Configuration {
public static Builder builder() {
return new AutoValue_WriteWordsProvider_Configuration.Builder();
}

@SchemaFieldDescription("Writes to output files with this prefix.")
public abstract String getFilePathPrefix();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setFilePathPrefix(String filePathPrefix);

public abstract Configuration build();
}
}
}
4 changes: 4 additions & 0 deletions sdks/standard_expansion_services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@
#
# Transform identifiers listed in the `skip_transforms` field will be skipped.
#
#
# Any new gradle targets added here should also be added to:
# - sdks/python/build.gradle (as a dependency in the 'generateExternalTransformsConfig' task)
# - sdks/python/test-suites/xlang/build.gradle (look for 'servicesToGenerateFrom')
#
# After making changes here, please run `./gradlew generateExternalTransformsConfig`
# to regenerate the config file at sdks/standard_external_transforms.yaml
#
# Refer to sdks/python/gen_xlang_wrappers.py for more info.

- gradle_target: 'sdks:java:io:expansion-service:shadowJar'
Expand Down

0 comments on commit 2ac5594

Please sign in to comment.