+ * Gradle Preparation
+ * To run this example your build.gradle file should contain the following task
+ * to execute the pipeline:
+ * {@code
+ * task execute (type:JavaExec) {
+ * main = System.getProperty("mainClass")
+ * classpath = sourceSets.main.runtimeClasspath
+ * systemProperties System.getProperties()
+ * args System.getProperty("exec.args", "").split()
+ * }
+ * }
+ * This task allows to run the pipeline via the following command:
+ * {@code
+ * gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+ * -Dexec.args="--= --="
+ * }
+ * Running the pipeline
+ * To execute this pipeline, specify the parameters:
+ *
+ * - Data schema
+ * - dataSchemaPath: Path to data schema (JSON format) compatible with BigQuery.
+ * - 1 specified input source out of these:
+ * - File System
+ * - inputFilePattern: Filepattern for files to read data from
+ * - inputFileFormat: File format of input files. Supported formats: JSON, CSV
+ * - In case if input data is in CSV format:
+ * - csvContainsHeaders: `true` if file(s) in bucket to read data from contain headers,
+ * and `false` otherwise
+ * - csvDelimiter: Delimiting character in CSV. Default: use delimiter provided in
+ * csvFormat
+ * - csvFormat: Csv format according to Apache Commons CSV format. Default is:
+ * [Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+ * . Must match format names exactly found
+ * at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+ * - Google Pub/Sub
+ * - pubsubTopic: The Cloud Pub/Sub topic to read from, in the format of '
+ * projects/yourproject/topics/yourtopic'
+ * - 1 specified output sink out of these:
+ * - File System
+ * - outputDirectory: Directory to write data to
+ * - outputFileFormat: File format of output files. Supported formats: JSON, CSV
+ * - windowDuration: The window duration in which data will be written. Should be specified
+ * only for 'Pub/Sub -> FileSystem' case. Defaults to 30s.
+ *
+ * Allowed formats are:
+ * - Ns (for seconds, example: 5s),
+ * - Nm (for minutes, example: 12m),
+ * - Nh (for hours, example: 2h).
+ * - Google Cloud BigQuery
+ * - bigQueryTableName: Cloud BigQuery table name to write into
+ * - tempLocation: Folder in a Google Cloud Storage bucket, which is needed for
+ * BigQuery to handle data writing
+ * - Cloud BigTable
+ * - bigTableProjectId: Id of the project where the Cloud BigTable instance to write into
+ * is located
+ * - bigTableInstanceId: Id of the Cloud BigTable instance to write into
+ * - bigTableTableId: Id of the Cloud BigTable table to write into
+ * - bigTableKeyColumnName: Column name to use as a key in Cloud BigTable
+ * - bigTableColumnFamilyName: Column family name to use in Cloud BigTable
+ * - RPC server parameters
+ * - rpcUri: URI for the API calls to RPC server
+ * - batchSize: Size of the batch to send to RPC server per request
+ *
+ * The template allows for the user to supply the following optional parameter:
+ *
+ * - nonTokenizedDeadLetterPath: Folder where failed to tokenize data will be stored
+ *
+ *
+ * Specify the parameters in the following format:
+ *
+ * {@code
+ * --dataSchemaPath="path-to-data-schema-in-json-format"
+ * --inputFilePattern="path-pattern-to-input-data"
+ * --outputDirectory="path-to-output-directory"
+ * # example for CSV case
+ * --inputFileFormat="CSV"
+ * --outputFileFormat="CSV"
+ * --csvContainsHeaders="true"
+ * --nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+ * --batchSize=batch-size-number
+ * --rpcUri=http://host:port/tokenize
+ * }
+ *
+ * By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify:
+ *
+ * {@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }
+ *
+ */
+public class DataTokenization {
+
+ /** Logger for class. */
+ private static final Logger LOG = LoggerFactory.getLogger(DataTokenization.class);
+
+ /** String/String Coder for FailsafeElement. */
+ public static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER =
+ FailsafeElementCoder.of(
+ NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));
+
+ /** The default suffix for error tables if dead letter table is not specified. */
+ private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";
+
+ /** The tag for the main output for the UDF. */
+ private static final TupleTag TOKENIZATION_OUT = new TupleTag() {};
+
+ /** The tag for the dead-letter output of the udf. */
+ static final TupleTag> TOKENIZATION_DEADLETTER_OUT =
+ new TupleTag>() {};
+
+ /**
+ * Main entry point for pipeline execution.
+ *
+ * @param args Command line arguments to the pipeline.
+ */
+ public static void main(String[] args) {
+ DataTokenizationOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(DataTokenizationOptions.class);
+ FileSystems.setDefaultPipelineOptions(options);
+
+ run(options);
+ }
+
+ /**
+ * Runs the pipeline to completion with the specified options.
+ *
+ * @param options The execution options.
+ * @return The pipeline result.
+ */
+ @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"})
+ public static PipelineResult run(DataTokenizationOptions options) {
+ SchemasUtils schema = null;
+ try {
+ schema = new SchemasUtils(options.getDataSchemaPath(), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ LOG.error("Failed to retrieve schema for data.", e);
+ }
+ checkArgument(schema != null, "Data schema is mandatory.");
+
+ // Create the pipeline
+ Pipeline pipeline = Pipeline.create(options);
+ // Register the coder for pipeline
+ CoderRegistry coderRegistry = pipeline.getCoderRegistry();
+ coderRegistry.registerCoderForType(
+ FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);
+ coderRegistry.registerCoderForType(
+ RowCoder.of(schema.getBeamSchema()).getEncodedTypeDescriptor(),
+ RowCoder.of(schema.getBeamSchema()));
+
+ /*
+ * Row/Row Coder for FailsafeElement.
+ */
+ FailsafeElementCoder coder =
+ FailsafeElementCoder.of(
+ RowCoder.of(schema.getBeamSchema()), RowCoder.of(schema.getBeamSchema()));
+
+ coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
+
+ PCollection rows;
+ if (options.getInputFilePattern() != null) {
+ rows = new TokenizationFileSystemIO(options).read(pipeline, schema);
+ } else if (options.getPubsubTopic() != null) {
+ rows =
+ pipeline
+ .apply(
+ "ReadMessagesFromPubsub",
+ PubsubIO.readStrings().fromTopic(options.getPubsubTopic()))
+ .apply(
+ "TransformToBeamRow",
+ new JsonToBeamRow(options.getNonTokenizedDeadLetterPath(), schema));
+ if (options.getOutputDirectory() != null) {
+ rows = rows.apply(Window.into(FixedWindows.of(parseDuration(options.getWindowDuration()))));
+ }
+ } else {
+ throw new IllegalStateException(
+ "No source is provided, please configure File System or Pub/Sub");
+ }
+
+ /*
+ Tokenize data using remote API call
+ */
+ PCollectionTuple tokenizedRows =
+ rows.setRowSchema(schema.getBeamSchema())
+ .apply(
+ MapElements.into(
+ TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.rows()))
+ .via((Row row) -> KV.of(0, row)))
+ .setCoder(KvCoder.of(VarIntCoder.of(), RowCoder.of(schema.getBeamSchema())))
+ .apply(
+ "DsgTokenization",
+ RowToTokenizedRow.newBuilder()
+ .setBatchSize(options.getBatchSize())
+ .setRpcURI(options.getRpcUri())
+ .setSchema(schema.getBeamSchema())
+ .setSuccessTag(TOKENIZATION_OUT)
+ .setFailureTag(TOKENIZATION_DEADLETTER_OUT)
+ .build());
+
+ String csvDelimiter = options.getCsvDelimiter();
+ if (options.getNonTokenizedDeadLetterPath() != null) {
+ /*
+ Write tokenization errors to dead-letter sink
+ */
+ tokenizedRows
+ .get(TOKENIZATION_DEADLETTER_OUT)
+ .apply(
+ "ConvertToCSV",
+ MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
+ .via(
+ (FailsafeElement fse) ->
+ FailsafeElement.of(
+ new RowToCsv(csvDelimiter).getCsvFromRow(fse.getOriginalPayload()),
+ new RowToCsv(csvDelimiter).getCsvFromRow(fse.getPayload()))))
+ .apply(
+ "WriteTokenizationErrorsToFS",
+ ErrorConverters.WriteErrorsToTextIO.newBuilder()
+ .setErrorWritePath(options.getNonTokenizedDeadLetterPath())
+ .setTranslateFunction(SerializableFunctions.getCsvErrorConverter())
+ .build());
+ }
+
+ if (options.getOutputDirectory() != null) {
+ new TokenizationFileSystemIO(options)
+ .write(tokenizedRows.get(TOKENIZATION_OUT), schema.getBeamSchema());
+ } else if (options.getBigQueryTableName() != null) {
+ WriteResult writeResult =
+ TokenizationBigQueryIO.write(
+ tokenizedRows.get(TOKENIZATION_OUT),
+ options.getBigQueryTableName(),
+ schema.getBigQuerySchema());
+ writeResult
+ .getFailedInsertsWithErr()
+ .apply(
+ "WrapInsertionErrors",
+ MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
+ .via(TokenizationBigQueryIO::wrapBigQueryInsertError))
+ .setCoder(FAILSAFE_ELEMENT_CODER)
+ .apply(
+ "WriteInsertionFailedRecords",
+ ErrorConverters.WriteStringMessageErrors.newBuilder()
+ .setErrorRecordsTable(
+ options.getBigQueryTableName() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
+ .setErrorRecordsTableSchema(DEADLETTER_SCHEMA)
+ .build());
+ } else if (options.getBigTableInstanceId() != null) {
+ new TokenizationBigTableIO(options)
+ .write(tokenizedRows.get(TOKENIZATION_OUT), schema.getBeamSchema());
+ } else {
+ throw new IllegalStateException(
+ "No sink is provided, please configure BigQuery or BigTable.");
+ }
+
+ return pipeline.run();
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
new file mode 100644
index 0000000000000..7d92f604ee4f5
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
@@ -0,0 +1,172 @@
+
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read data from one of
+the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or
+JSON format from a specified input source, send the data to an external processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+ - Setting Up Project Environment
+ - Build Data Tokenization Dataflow Flex Template
+ - Creating the Dataflow Flex Template
+ - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+ main = System.getProperty("mainClass")
+ classpath = sourceSets.main.runtimeClasspath
+ systemProperties System.getProperties()
+ args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+ -Dexec.args="--= --="
+```
+
+## Running the pipeline
+
+To execute this pipeline, specify the parameters:
+
+- Data schema
+ - **dataSchemaPath**: Path to data schema (JSON format) compatible with BigQuery.
+- 1 specified input source out of these:
+ - File System
+ - **inputFilePattern**: Filepattern for files to read data from
+ - **inputFileFormat**: File format of input files. Supported formats: JSON, CSV
+ - In case if input data is in CSV format:
+ - **csvContainsHeaders**: `true` if file(s) in bucket to read data from contain headers,
+ and `false` otherwise
+ - **csvDelimiter**: Delimiting character in CSV. Default: use delimiter provided in
+ csvFormat
+ - **csvFormat**: Csv format according to Apache Commons CSV format. Default is:
+ [Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+ . Must match format names exactly found
+ at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+ - Google Pub/Sub
+ - **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format of '
+ projects/yourproject/topics/yourtopic'
+- 1 specified output sink out of these:
+ - File System
+ - **outputDirectory**: Directory to write data to
+ - **outputFileFormat**: File format of output files. Supported formats: JSON, CSV
+ - **windowDuration**: The window duration in which data will be written. Should be specified
+ only for 'Pub/Sub -> GCS' case. Defaults to 30s.
+
+ Allowed formats are:
+ - Ns (for seconds, example: 5s),
+ - Nm (for minutes, example: 12m),
+ - Nh (for hours, example: 2h).
+ - Google Cloud BigQuery
+ - **bigQueryTableName**: Cloud BigQuery table name to write into
+ - **tempLocation**: Folder in a Google Cloud Storage bucket, which is needed for
+ BigQuery to handle data writing
+ - Cloud BigTable
+ - **bigTableProjectId**: Id of the project where the Cloud BigTable instance to write into
+ is located
+ - **bigTableInstanceId**: Id of the Cloud BigTable instance to write into
+ - **bigTableTableId**: Id of the Cloud BigTable table to write into
+ - **bigTableKeyColumnName**: Column name to use as a key in Cloud BigTable
+ - **bigTableColumnFamilyName**: Column family name to use in Cloud BigTable
+- RPC server parameters
+ - **rpcUri**: URI for the API calls to RPC server
+ - **batchSize**: Size of the batch to send to RPC server per request
+
+The template allows for the user to supply the following optional parameter:
+
+- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be stored
+
+The template also allows user to override the environment variable:
+
+- **MAX_BUFFERING_DURATION_MS**: Max duration of buffering rows in milliseconds. Default value: 100ms.
+
+in the following format:
+
+```bash
+--dataSchemaPath="path-to-data-schema-in-json-format"
+--inputFilePattern="path-pattern-to-input-data"
+--outputDirectory="path-to-output-directory"
+# example for CSV case
+--inputFileFormat="CSV"
+--outputFileFormat="CSV"
+--csvContainsHeaders="true"
+--nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+--batchSize=batch-size-number
+--rpcUri=http://host:port/tokenize
+```
+
+By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify:
+
+```bash
+--runner=YOUR_SELECTED_RUNNER
+```
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and
+the [Examples README](../../../../../../../../../README.md) for more information about how to run this example.
+
+## Running as a Dataflow Template
+
+This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See
+this template documentation [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md) for
+more information.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/DataTokenizationOptions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/DataTokenizationOptions.java
new file mode 100644
index 0000000000000..c68a5378e4878
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/DataTokenizationOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.options;
+
+import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationBigTableIO;
+import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO.FileSystemPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * The {@link DataTokenizationOptions} interface provides the custom execution options passed by the
+ * executor at the command-line.
+ */
+public interface DataTokenizationOptions
+ extends PipelineOptions, FileSystemPipelineOptions, TokenizationBigTableIO.BigTableOptions {
+
+ @Description("Path to data schema (JSON format) compatible with BigQuery.")
+ String getDataSchemaPath();
+
+ void setDataSchemaPath(String dataSchemaPath);
+
+ @Description(
+ "The Cloud Pub/Sub topic to read from."
+ + "The name should be in the format of "
+ + "projects//topics/.")
+ String getPubsubTopic();
+
+ void setPubsubTopic(String pubsubTopic);
+
+ @Description("Cloud BigQuery table name to write into.")
+ String getBigQueryTableName();
+
+ void setBigQueryTableName(String bigQueryTableName);
+
+ // Tokenization API specific parameters
+ @Description("URI for the API calls to RPC server.")
+ String getRpcUri();
+
+ void setRpcUri(String dsgUri);
+
+ @Description("Size of the batch to send to RPC server per request.")
+ @Default.Integer(10)
+ Integer getBatchSize();
+
+ void setBatchSize(Integer batchSize);
+
+ @Description("Dead-Letter path to store not-tokenized data")
+ String getNonTokenizedDeadLetterPath();
+
+ void setNonTokenizedDeadLetterPath(String nonTokenizedDeadLetterPath);
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/package-info.java
new file mode 100644
index 0000000000000..b61fce1519103
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2020 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Protegrity Data Tokenization template for Google Cloud Teleport. */
+package org.apache.beam.examples.complete.datatokenization.options;
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/package-info.java
new file mode 100644
index 0000000000000..09d4df153c3bf
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2020 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Protegrity Data Tokenization template for Google Cloud Teleport. */
+package org.apache.beam.examples.complete.datatokenization;
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
new file mode 100644
index 0000000000000..5ddf706d6f2da
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.rowToJson;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.util.RowJsonUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonArray;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonObject;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DataProtectors} Using passing parameters transform will buffer input rows in batch and
+ * will send it when the count of buffered rows will equal specified batch size. When it takes the
+ * last one batch, it will send it when the last row will come to doFn even count of buffered rows
+ * will less than the batch size.
+ */
+public class DataProtectors {
+
+ /** Logger for class. */
+ private static final Logger LOG = LoggerFactory.getLogger(DataProtectors.class);
+
+ public static final String ID_FIELD_NAME = "ID";
+ private static final Long MAX_BUFFERING_DURATION_MS =
+ Long.valueOf(System.getenv().getOrDefault("MAX_BUFFERING_DURATION_MS", "100"));
+
+ /**
+ * The {@link RowToTokenizedRow} transform converts {@link Row} to {@link TableRow} objects. The
+ * transform accepts a {@link FailsafeElement} object so the original payload of the incoming
+ * record can be maintained across multiple series of transforms.
+ */
+ @AutoValue
+ public abstract static class RowToTokenizedRow
+ extends PTransform>, PCollectionTuple> {
+
+ public static Builder newBuilder() {
+ return new AutoValue_DataProtectors_RowToTokenizedRow.Builder<>();
+ }
+
+ public abstract TupleTag successTag();
+
+ public abstract TupleTag> failureTag();
+
+ public abstract Schema schema();
+
+ public abstract int batchSize();
+
+ public abstract String rpcURI();
+
+ @Override
+ public PCollectionTuple expand(PCollection> inputRows) {
+ FailsafeElementCoder coder =
+ FailsafeElementCoder.of(RowCoder.of(schema()), RowCoder.of(schema()));
+
+ Duration maxBuffering = Duration.millis(MAX_BUFFERING_DURATION_MS);
+ PCollectionTuple pCollectionTuple =
+ inputRows
+ .apply(
+ "GroupRowsIntoBatches",
+ GroupIntoBatches.ofSize(batchSize())
+ .withMaxBufferingDuration(maxBuffering))
+ .apply(
+ "Tokenize",
+ ParDo.of(new TokenizationFn(schema(), rpcURI(), failureTag()))
+ .withOutputTags(successTag(), TupleTagList.of(failureTag())));
+
+ return PCollectionTuple.of(
+ successTag(), pCollectionTuple.get(successTag()).setRowSchema(schema()))
+ .and(failureTag(), pCollectionTuple.get(failureTag()).setCoder(coder));
+ }
+
+ /** Builder for {@link RowToTokenizedRow}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setSuccessTag(TupleTag successTag);
+
+ public abstract Builder setFailureTag(TupleTag> failureTag);
+
+ public abstract Builder setSchema(Schema schema);
+
+ public abstract Builder setBatchSize(int batchSize);
+
+ public abstract Builder setRpcURI(String rpcURI);
+
+ public abstract RowToTokenizedRow build();
+ }
+ }
+
+ /** Class implements stateful doFn for data tokenization using remote RPC. */
+ @SuppressWarnings("initialization.static.fields.uninitialized")
+ public static class TokenizationFn extends DoFn>, Row> {
+
+ private static Schema schemaToRpc;
+ private static CloseableHttpClient httpclient;
+ private static ObjectMapper objectMapperSerializerForSchema;
+ private static ObjectMapper objectMapperDeserializerForSchema;
+
+ private final Schema schema;
+ private final String rpcURI;
+ private final TupleTag> failureTag;
+
+ private Map inputRowsWithIds;
+
+ public TokenizationFn(
+ Schema schema, String rpcURI, TupleTag> failureTag) {
+ this.schema = schema;
+ this.rpcURI = rpcURI;
+ this.failureTag = failureTag;
+ this.inputRowsWithIds = new HashMap<>();
+ }
+
+ @Setup
+ public void setup() {
+
+ List fields = schema.getFields();
+ fields.add(Field.of(ID_FIELD_NAME, FieldType.STRING));
+ schemaToRpc = new Schema(fields);
+
+ objectMapperSerializerForSchema =
+ RowJsonUtils.newObjectMapperWith(RowJson.RowJsonSerializer.forSchema(schemaToRpc));
+
+ objectMapperDeserializerForSchema =
+ RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schemaToRpc));
+
+ httpclient = HttpClients.createDefault();
+ }
+
+ @Teardown
+ public void close() {
+ try {
+ httpclient.close();
+ } catch (IOException exception) {
+ String exceptionMessage = exception.getMessage();
+ if (exceptionMessage != null) {
+ LOG.warn("Can't close connection: {}", exceptionMessage);
+ }
+ }
+ }
+
+ @ProcessElement
+ @SuppressWarnings("argument.type.incompatible")
+ public void process(@Element KV> element, ProcessContext context) {
+ Iterable rows = element.getValue();
+
+ try {
+ for (Row outputRow : getTokenizedRow(rows)) {
+ context.output(outputRow);
+ }
+ } catch (Exception e) {
+ for (Row outputRow : rows) {
+ context.output(
+ failureTag,
+ FailsafeElement.of(outputRow, outputRow)
+ .setErrorMessage(e.getMessage())
+ .setStacktrace(Throwables.getStackTraceAsString(e)));
+ }
+ }
+ }
+
+ private ArrayList rowsToJsons(Iterable inputRows) {
+ ArrayList jsons = new ArrayList<>();
+ Map inputRowsWithIds = new HashMap<>();
+ for (Row inputRow : inputRows) {
+
+ Row.Builder builder = Row.withSchema(schemaToRpc);
+ for (Schema.Field field : schemaToRpc.getFields()) {
+ if (inputRow.getSchema().hasField(field.getName())) {
+ builder = builder.addValue(inputRow.getValue(field.getName()));
+ }
+ }
+ String id = UUID.randomUUID().toString();
+ builder = builder.addValue(id);
+ inputRowsWithIds.put(id, inputRow);
+
+ Row row = builder.build();
+
+ jsons.add(rowToJson(objectMapperSerializerForSchema, row));
+ }
+ this.inputRowsWithIds = inputRowsWithIds;
+ return jsons;
+ }
+
+ private String formatJsonsToRpcBatch(Iterable jsons) {
+ StringBuilder stringBuilder = new StringBuilder(String.join(",", jsons));
+ stringBuilder.append("]").insert(0, "{\"data\": [").append("}");
+ return stringBuilder.toString();
+ }
+
+ @SuppressWarnings("argument.type.incompatible")
+ private ArrayList getTokenizedRow(Iterable inputRows) throws IOException {
+ ArrayList outputRows = new ArrayList<>();
+
+ CloseableHttpResponse response =
+ sendRpc(formatJsonsToRpcBatch(rowsToJsons(inputRows)).getBytes(Charset.defaultCharset()));
+
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ LOG.error("Send to RPC '{}' failed with '{}'", this.rpcURI, response.getStatusLine());
+ }
+
+ String tokenizedData =
+ IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
+
+ Gson gson = new Gson();
+ JsonArray jsonTokenizedRows =
+ gson.fromJson(tokenizedData, JsonObject.class).getAsJsonArray("data");
+
+ for (int i = 0; i < jsonTokenizedRows.size(); i++) {
+ Row tokenizedRow =
+ RowJsonUtils.jsonToRow(
+ objectMapperDeserializerForSchema, jsonTokenizedRows.get(i).toString());
+ Row.FieldValueBuilder rowBuilder =
+ Row.fromRow(this.inputRowsWithIds.get(tokenizedRow.getString(ID_FIELD_NAME)));
+ for (Schema.Field field : schemaToRpc.getFields()) {
+ if (field.getName().equals(ID_FIELD_NAME)) {
+ continue;
+ }
+ rowBuilder =
+ rowBuilder.withFieldValue(field.getName(), tokenizedRow.getValue(field.getName()));
+ }
+ outputRows.add(rowBuilder.build());
+ }
+
+ return outputRows;
+ }
+
+ private CloseableHttpResponse sendRpc(byte[] data) throws IOException {
+ HttpPost httpPost = new HttpPost(rpcURI);
+ HttpEntity stringEntity = new ByteArrayEntity(data, ContentType.APPLICATION_JSON);
+ httpPost.setEntity(stringEntity);
+ return httpclient.execute(httpPost);
+ }
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/JsonToBeamRow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/JsonToBeamRow.java
new file mode 100644
index 0000000000000..a6c87a368cf98
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/JsonToBeamRow.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms;
+
+import static org.apache.beam.examples.complete.datatokenization.DataTokenization.FAILSAFE_ELEMENT_CODER;
+
+import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils;
+import org.apache.beam.sdk.transforms.JsonToRow;
+import org.apache.beam.sdk.transforms.JsonToRow.ParseResult;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/** The {@link JsonToBeamRow} converts jsons string to beam rows. */
+public class JsonToBeamRow extends PTransform, PCollection> {
+
+ private final String failedToParseDeadLetterPath;
+ private final transient SchemasUtils schema;
+
+ public JsonToBeamRow(String failedToParseDeadLetterPath, SchemasUtils schema) {
+ this.failedToParseDeadLetterPath = failedToParseDeadLetterPath;
+ this.schema = schema;
+ }
+
+ @Override
+ @SuppressWarnings("argument.type.incompatible")
+ public PCollection expand(PCollection jsons) {
+ ParseResult rows =
+ jsons.apply(
+ "JsonToRow",
+ JsonToRow.withExceptionReporting(schema.getBeamSchema()).withExtendedErrorInfo());
+
+ if (failedToParseDeadLetterPath != null) {
+ /*
+ * Write Row conversion errors to filesystem specified path
+ */
+ rows.getFailedToParseLines()
+ .apply(
+ "ToFailsafeElement",
+ MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
+ .via(
+ (Row errRow) ->
+ FailsafeElement.of(
+ Strings.nullToEmpty(errRow.getString("line")),
+ Strings.nullToEmpty(errRow.getString("line")))
+ .setErrorMessage(Strings.nullToEmpty(errRow.getString("err")))))
+ .apply(
+ "WriteCsvConversionErrorsToFS",
+ ErrorConverters.WriteErrorsToTextIO.newBuilder()
+ .setErrorWritePath(failedToParseDeadLetterPath)
+ .setTranslateFunction(SerializableFunctions.getCsvErrorConverter())
+ .build());
+ }
+
+ return rows.getResults().setRowSchema(schema.getBeamSchema());
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/SerializableFunctions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/SerializableFunctions.java
new file mode 100644
index 0000000000000..1e5df0776e166
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/SerializableFunctions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+/** The {@link SerializableFunctions} class to store static Serializable functions. */
+public class SerializableFunctions {
+
+ private static final SerializableFunction, String>
+ csvErrorConverter =
+ (FailsafeElement failsafeElement) -> {
+ ArrayList outputRow = new ArrayList<>();
+ final String message = failsafeElement.getOriginalPayload();
+ String timestamp = Instant.now().toString();
+ outputRow.add(timestamp);
+ outputRow.add(failsafeElement.getErrorMessage());
+ outputRow.add(failsafeElement.getStacktrace());
+ // Only set the payload if it's populated on the message.
+ if (failsafeElement.getOriginalPayload() != null) {
+ outputRow.add(message);
+ }
+
+ return String.join(",", outputRow);
+ };
+
+ public static SerializableFunction, String>
+ getCsvErrorConverter() {
+ return csvErrorConverter;
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
new file mode 100644
index 0000000000000..fe8f4c1afad8b
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link TokenizationBigQueryIO} class for writing data from template to BigTable. */
+public class TokenizationBigQueryIO {
+
+ /** Logger for class. */
+ private static final Logger LOG = LoggerFactory.getLogger(TokenizationBigQueryIO.class);
+
+ public static WriteResult write(
+ PCollection input, String bigQueryTableName, TableSchema schema) {
+ return input
+ .apply("RowToTableRow", ParDo.of(new RowToTableRowFn()))
+ .apply(
+ "WriteSuccessfulRecords",
+ org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.writeTableRows()
+ .withCreateDisposition(
+ org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition
+ .CREATE_IF_NEEDED)
+ .withWriteDisposition(
+ org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
+ .WRITE_APPEND)
+ .withExtendedErrorInfo()
+ .withMethod(
+ org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.STREAMING_INSERTS)
+ .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
+ .withSchema(schema)
+ .to(bigQueryTableName));
+ }
+
+ /**
+ * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
+ *
+ * @param insertError BigQueryInsert error.
+ * @return FailsafeElement object.
+ */
+ public static FailsafeElement wrapBigQueryInsertError(
+ BigQueryInsertError insertError) {
+
+ FailsafeElement failsafeElement;
+ try {
+
+ failsafeElement =
+ FailsafeElement.of(
+ insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString());
+ failsafeElement.setErrorMessage(insertError.getError().toPrettyString());
+
+ } catch (IOException e) {
+ TokenizationBigQueryIO.LOG.error("Failed to wrap BigQuery insert error.");
+ throw new RuntimeException(e);
+ }
+ return failsafeElement;
+ }
+
+ /**
+ * The {@link RowToTableRowFn} class converts a row to tableRow using {@link
+ * BigQueryUtils#toTableRow()}.
+ */
+ public static class RowToTableRowFn extends DoFn {
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ Row row = context.element();
+ context.output(BigQueryUtils.toTableRow(row));
+ }
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
new file mode 100644
index 0000000000000..d7d1c3e972320
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.bigtable.v2.Mutation;
+import com.google.protobuf.ByteString;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteResult;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link TokenizationBigTableIO} class for writing data from template to BigTable. */
+public class TokenizationBigTableIO {
+
+ /** Logger for class. */
+ private static final Logger LOG = LoggerFactory.getLogger(TokenizationBigTableIO.class);
+
+ private final DataTokenizationOptions options;
+
+ public TokenizationBigTableIO(DataTokenizationOptions options) {
+ this.options = options;
+ }
+
+ public PDone write(PCollection input, Schema schema) {
+ return input
+ .apply("ConvertToBigTableFormat", ParDo.of(new TransformToBigTableFormat(schema)))
+ .apply(
+ "WriteToBigTable",
+ BigtableIO.write()
+ .withProjectId(options.getBigTableProjectId())
+ .withInstanceId(options.getBigTableInstanceId())
+ .withTableId(options.getBigTableTableId())
+ .withWriteResults())
+ .apply("LogRowCount", new LogSuccessfulRows());
+ }
+
+ static class TransformToBigTableFormat extends DoFn>> {
+
+ private final Schema schema;
+
+ TransformToBigTableFormat(Schema schema) {
+ this.schema = schema;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element Row in, OutputReceiver>> out, ProcessContext c) {
+ DataTokenizationOptions options = c.getPipelineOptions().as(DataTokenizationOptions.class);
+ // Mapping every field in provided Row to Mutation.SetCell, which will create/update
+ // cell content with provided data
+ Set mutations =
+ schema.getFields().stream()
+ .map(Schema.Field::getName)
+ // Ignoring key field, otherwise it will be added as regular column
+ .filter(fieldName -> !Objects.equals(fieldName, options.getBigTableKeyColumnName()))
+ .map(fieldName -> Pair.of(fieldName, in.getString(fieldName)))
+ .map(
+ pair ->
+ Mutation.newBuilder()
+ .setSetCell(
+ Mutation.SetCell.newBuilder()
+ .setFamilyName(options.getBigTableColumnFamilyName())
+ .setColumnQualifier(
+ ByteString.copyFrom(pair.getKey(), StandardCharsets.UTF_8))
+ .setValue(
+ ByteString.copyFrom(pair.getValue(), StandardCharsets.UTF_8))
+ .setTimestampMicros(System.currentTimeMillis() * 1000)
+ .build())
+ .build())
+ .collect(Collectors.toSet());
+ // Converting key value to BigTable format
+ String columnName = in.getString(options.getBigTableKeyColumnName());
+ if (columnName != null) {
+ ByteString key = ByteString.copyFrom(columnName, StandardCharsets.UTF_8);
+ out.output(KV.of(key, mutations));
+ }
+ }
+ }
+
+ static class LogSuccessfulRows extends PTransform, PDone> {
+
+ @Override
+ public PDone expand(PCollection input) {
+ input.apply(
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(@Element BigtableWriteResult in) {
+ LOG.info("Successfully wrote {} rows.", in.getRowsWritten());
+ }
+ }));
+ return PDone.in(input.getPipeline());
+ }
+ }
+
+ /**
+ * Necessary {@link PipelineOptions} options for Pipelines that perform write operations to
+ * BigTable.
+ */
+ public interface BigTableOptions extends PipelineOptions {
+
+ @Description("Id of the project where the Cloud BigTable instance to write into is located.")
+ String getBigTableProjectId();
+
+ void setBigTableProjectId(String bigTableProjectId);
+
+ @Description("Id of the Cloud BigTable instance to write into.")
+ String getBigTableInstanceId();
+
+ void setBigTableInstanceId(String bigTableInstanceId);
+
+ @Description("Id of the Cloud BigTable table to write into.")
+ String getBigTableTableId();
+
+ void setBigTableTableId(String bigTableTableId);
+
+ @Description("Column name to use as a key in Cloud BigTable.")
+ String getBigTableKeyColumnName();
+
+ void setBigTableKeyColumnName(String bigTableKeyColumnName);
+
+ @Description("Column family name to use in Cloud BigTable.")
+ String getBigTableColumnFamilyName();
+
+ void setBigTableColumnFamilyName(String bigTableColumnFamilyName);
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationFileSystemIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationFileSystemIO.java
new file mode 100644
index 0000000000000..54805e11e2d5d
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationFileSystemIO.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.examples.complete.datatokenization.transforms.JsonToBeamRow;
+import org.apache.beam.examples.complete.datatokenization.transforms.SerializableFunctions;
+import org.apache.beam.examples.complete.datatokenization.utils.CsvConverters;
+import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv;
+import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ToJson;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/** The {@link TokenizationFileSystemIO} class to read/write data from/into File Systems. */
+public class TokenizationFileSystemIO {
+
+ /** The tag for the headers of the CSV if required. */
+ static final TupleTag CSV_HEADERS = new TupleTag() {};
+
+ /** The tag for the lines of the CSV. */
+ static final TupleTag CSV_LINES = new TupleTag() {};
+
+ /** The tag for the dead-letter output. */
+ static final TupleTag> PROCESSING_DEADLETTER_OUT =
+ new TupleTag>() {};
+
+ /** The tag for the main output. */
+ static final TupleTag> PROCESSING_OUT =
+ new TupleTag>() {};
+
+ /** Supported format to read from GCS. */
+ public enum FORMAT {
+ JSON,
+ CSV
+ }
+
+ /**
+ * Necessary {@link PipelineOptions} options for Pipelines that operate with JSON/CSV data in FS.
+ */
+ public interface FileSystemPipelineOptions extends PipelineOptions {
+
+ @Description("Filepattern for files to read data from")
+ String getInputFilePattern();
+
+ void setInputFilePattern(String inputFilePattern);
+
+ @Description("File format of input files. Supported formats: JSON, CSV")
+ @Default.Enum("JSON")
+ TokenizationFileSystemIO.FORMAT getInputFileFormat();
+
+ void setInputFileFormat(FORMAT inputFileFormat);
+
+ @Description("Directory to write data to")
+ String getOutputDirectory();
+
+ void setOutputDirectory(String outputDirectory);
+
+ @Description("File format of output files. Supported formats: JSON, CSV")
+ @Default.Enum("JSON")
+ TokenizationFileSystemIO.FORMAT getOutputFileFormat();
+
+ void setOutputFileFormat(FORMAT outputFileFormat);
+
+ @Description(
+ "The window duration in which data will be written. "
+ + "Should be specified only for 'Pub/Sub -> FS' case. Defaults to 30s. "
+ + "Allowed formats are: "
+ + "Ns (for seconds, example: 5s), "
+ + "Nm (for minutes, example: 12m), "
+ + "Nh (for hours, example: 2h).")
+ @Default.String("30s")
+ String getWindowDuration();
+
+ void setWindowDuration(String windowDuration);
+
+ // CSV parameters
+ @Description("If file(s) contain headers")
+ Boolean getCsvContainsHeaders();
+
+ void setCsvContainsHeaders(Boolean csvContainsHeaders);
+
+ @Description("Delimiting character in CSV. Default: use delimiter provided in csvFormat")
+ @Default.InstanceFactory(CsvConverters.DelimiterFactory.class)
+ String getCsvDelimiter();
+
+ void setCsvDelimiter(String csvDelimiter);
+
+ @Description(
+ "Csv format according to Apache Commons CSV format. Default is: Apache Commons CSV"
+ + " default\n"
+ + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT\n"
+ + "Must match format names exactly found at: "
+ + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html")
+ @Default.String("Default")
+ String getCsvFormat();
+
+ void setCsvFormat(String csvFormat);
+ }
+
+ private final DataTokenizationOptions options;
+
+ public TokenizationFileSystemIO(DataTokenizationOptions options) {
+ this.options = options;
+ }
+
+ public PCollection read(Pipeline pipeline, SchemasUtils schema) {
+ switch (options.getInputFileFormat()) {
+ case JSON:
+ return readJson(pipeline)
+ .apply(new JsonToBeamRow(options.getNonTokenizedDeadLetterPath(), schema));
+ case CSV:
+ return readCsv(pipeline, schema)
+ .apply(new JsonToBeamRow(options.getNonTokenizedDeadLetterPath(), schema));
+ default:
+ throw new IllegalStateException(
+ "No valid format for input data is provided. Please, choose JSON or CSV.");
+ }
+ }
+
+ private PCollection readJson(Pipeline pipeline) {
+ return pipeline.apply("ReadJsonFromFiles", TextIO.read().from(options.getInputFilePattern()));
+ }
+
+ private PCollection readCsv(Pipeline pipeline, SchemasUtils schema) {
+ /*
+ * Step 1: Read CSV file(s) from File System using {@link CsvConverters.ReadCsv}.
+ */
+ PCollectionTuple csvLines = readCsv(pipeline);
+ /*
+ * Step 2: Convert lines to Json.
+ */
+ PCollectionTuple jsons = csvLineToJson(csvLines, schema.getJsonBeamSchema());
+
+ if (options.getNonTokenizedDeadLetterPath() != null) {
+ /*
+ * Step 3: Write jsons to dead-letter that weren't successfully processed.
+ */
+ jsons
+ .get(PROCESSING_DEADLETTER_OUT)
+ .apply(
+ "WriteCsvConversionErrorsToFS",
+ ErrorConverters.WriteErrorsToTextIO.newBuilder()
+ .setErrorWritePath(options.getNonTokenizedDeadLetterPath())
+ .setTranslateFunction(SerializableFunctions.getCsvErrorConverter())
+ .build());
+ }
+
+ /*
+ * Step 4: Get jsons that were successfully processed.
+ */
+ return jsons
+ .get(PROCESSING_OUT)
+ .apply(
+ "GetJson",
+ MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload));
+ }
+
+ private PCollectionTuple readCsv(Pipeline pipeline) {
+ return pipeline.apply(
+ "ReadCsvFromFiles",
+ CsvConverters.ReadCsv.newBuilder()
+ .setCsvFormat(options.getCsvFormat())
+ .setDelimiter(options.getCsvDelimiter())
+ .setHasHeaders(options.getCsvContainsHeaders())
+ .setInputFileSpec(options.getInputFilePattern())
+ .setHeaderTag(CSV_HEADERS)
+ .setLineTag(CSV_LINES)
+ .build());
+ }
+
+ private PCollectionTuple csvLineToJson(PCollectionTuple csvLines, String jsonSchema) {
+ return csvLines.apply(
+ "LineToJson",
+ CsvConverters.LineToFailsafeJson.newBuilder()
+ .setDelimiter(options.getCsvDelimiter())
+ .setJsonSchema(jsonSchema)
+ .setHeaderTag(CSV_HEADERS)
+ .setLineTag(CSV_LINES)
+ .setUdfOutputTag(PROCESSING_OUT)
+ .setUdfDeadletterTag(PROCESSING_DEADLETTER_OUT)
+ .build());
+ }
+
+ public PDone write(PCollection input, Schema schema) {
+ switch (options.getOutputFileFormat()) {
+ case JSON:
+ return writeJson(input);
+ case CSV:
+ return writeCsv(input, schema);
+ default:
+ throw new IllegalStateException(
+ "No valid format for output data is provided. Please, choose JSON or CSV.");
+ }
+ }
+
+ private PDone writeJson(PCollection input) {
+ PCollection jsons = input.apply("RowsToJSON", ToJson.of());
+
+ if (jsons.isBounded() == IsBounded.BOUNDED) {
+ return jsons.apply("WriteToFS", TextIO.write().to(options.getOutputDirectory()));
+ } else {
+ return jsons.apply(
+ "WriteToFS",
+ TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutputDirectory()));
+ }
+ }
+
+ private PDone writeCsv(PCollection input, Schema schema) {
+ String header = String.join(options.getCsvDelimiter(), schema.getFieldNames());
+ String csvDelimiter = options.getCsvDelimiter();
+
+ PCollection csvs =
+ input.apply(
+ "ConvertToCSV",
+ MapElements.into(TypeDescriptors.strings())
+ .via((Row inputRow) -> new RowToCsv(csvDelimiter).getCsvFromRow(inputRow)));
+
+ if (csvs.isBounded() == IsBounded.BOUNDED) {
+ return csvs.apply(
+ "WriteToFS", TextIO.write().to(options.getOutputDirectory()).withHeader(header));
+ } else {
+ return csvs.apply(
+ "WriteToFS",
+ TextIO.write()
+ .withWindowedWrites()
+ .withNumShards(1)
+ .to(options.getOutputDirectory())
+ .withHeader(header));
+ }
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/package-info.java
new file mode 100644
index 0000000000000..0c1c03c0295c1
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2020 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Protegrity Data Tokenization template for Google Cloud Teleport. */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/package-info.java
new file mode 100644
index 0000000000000..5530e633d727a
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2020 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Protegrity Data Tokenization template for Google Cloud Teleport. */
+package org.apache.beam.examples.complete.datatokenization.transforms;
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
new file mode 100644
index 0000000000000..090abcb80eb50
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import static org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils.getGcsFileAsString;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonWriter;
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sample;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Common transforms for Csv files. */
+@SuppressWarnings({"argument.type.incompatible"})
+public class CsvConverters {
+
+ /* Logger for class. */
+ private static final Logger LOG = LoggerFactory.getLogger(CsvConverters.class);
+
+ private static final String SUCCESSFUL_TO_JSON_COUNTER = "SuccessfulToJsonCounter";
+
+ private static final String FAILED_TO_JSON_COUNTER = "FailedToJsonCounter";
+
+ private static JsonParser jsonParser = new JsonParser();
+
+ /**
+ * Builds Json string from list of values and headers or values and schema if schema is provided.
+ *
+ * @param headers optional list of strings which is the header of the Csv file.
+ * @param values list of strings which are combined with header or json schema to create Json
+ * string.
+ * @param jsonSchemaString
+ * @return Json string containing object.
+ * @throws IOException thrown if Json object is not able to be written.
+ * @throws NumberFormatException thrown if value cannot be parsed into type successfully.
+ */
+ static String buildJsonString(
+ @Nullable List headers, List values, @Nullable String jsonSchemaString)
+ throws Exception {
+
+ StringWriter stringWriter = new StringWriter();
+ JsonWriter writer = new JsonWriter(stringWriter);
+
+ if (jsonSchemaString != null) {
+ JsonArray jsonSchema = jsonParser.parse(jsonSchemaString).getAsJsonArray();
+ writer.beginObject();
+
+ for (int i = 0; i < jsonSchema.size(); i++) {
+ JsonObject jsonObject = jsonSchema.get(i).getAsJsonObject();
+ String type = jsonObject.get("type").getAsString().toUpperCase();
+ writer.name(jsonObject.get("name").getAsString());
+
+ switch (type) {
+ case "LONG":
+ writer.value(Long.parseLong(values.get(i)));
+ break;
+
+ case "DOUBLE":
+ writer.value(Double.parseDouble(values.get(i)));
+ break;
+
+ case "INTEGER":
+ writer.value(Integer.parseInt(values.get(i)));
+ break;
+
+ case "SHORT":
+ writer.value(Short.parseShort(values.get(i)));
+ break;
+
+ case "BYTE":
+ writer.value(Byte.parseByte(values.get(i)));
+ break;
+
+ case "FLOAT":
+ writer.value(Float.parseFloat(values.get(i)));
+ break;
+
+ case "TEXT":
+ case "KEYWORD":
+ case "STRING":
+ writer.value(values.get(i));
+ break;
+
+ default:
+ LOG.error("Invalid data type, got: " + type);
+ throw new RuntimeException("Invalid data type, got: " + type);
+ }
+ }
+ writer.endObject();
+ writer.close();
+ return stringWriter.toString();
+
+ } else if (headers != null) {
+
+ writer.beginObject();
+
+ for (int i = 0; i < headers.size(); i++) {
+ writer.name(headers.get(i));
+ writer.value(values.get(i));
+ }
+
+ writer.endObject();
+ writer.close();
+ return stringWriter.toString();
+
+ } else {
+ LOG.error("No headers or schema specified");
+ throw new RuntimeException("No headers or schema specified");
+ }
+ }
+
+ /**
+ * Gets Csv format accoring to Apache Commons CSV. If user
+ * passed invalid format error is thrown.
+ */
+ public static CSVFormat getCsvFormat(String formatString, @Nullable String delimiter) {
+
+ CSVFormat format = CSVFormat.Predefined.valueOf(formatString).getFormat();
+
+ // If a delimiter has been passed set it here.
+ if (delimiter != null) {
+ return format.withDelimiter(delimiter.charAt(0));
+ }
+ return format;
+ }
+
+ /** Necessary {@link PipelineOptions} options for Csv Pipelines. */
+ public interface CsvPipelineOptions extends PipelineOptions {
+ @Description("Pattern to where data lives, ex: gs://mybucket/somepath/*.csv")
+ String getInputFileSpec();
+
+ void setInputFileSpec(String inputFileSpec);
+
+ @Description("If file(s) contain headers")
+ Boolean getContainsHeaders();
+
+ void setContainsHeaders(Boolean containsHeaders);
+
+ @Description("Deadletter table for failed inserts in form: :.
")
+ String getDeadletterTable();
+
+ void setDeadletterTable(String deadletterTable);
+
+ @Description("Delimiting character. Default: use delimiter provided in csvFormat")
+ @Default.InstanceFactory(DelimiterFactory.class)
+ String getDelimiter();
+
+ void setDelimiter(String delimiter);
+
+ @Description(
+ "Csv format according to Apache Commons CSV format. Default is: Apache Commons CSV"
+ + " default\n"
+ + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT\n"
+ + "Must match format names exactly found at: "
+ + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html")
+ @Default.String("Default")
+ String getCsvFormat();
+
+ void setCsvFormat(String csvFormat);
+
+ @Description("Optional: Path to JSON schema, ex gs://path/to/schema. ")
+ String getJsonSchemaPath();
+
+ void setJsonSchemaPath(String jsonSchemaPath);
+
+ @Description("Set to true if number of files is in the tens of thousands. Default: false")
+ @Default.Boolean(false)
+ Boolean getLargeNumFiles();
+
+ void setLargeNumFiles(Boolean largeNumFiles);
+ }
+
+ /**
+ * Default value factory to get delimiter from Csv format so that if the user does not pass one
+ * in, it matches the supplied {@link CsvPipelineOptions#getCsvFormat()}.
+ */
+ public static class DelimiterFactory implements DefaultValueFactory {
+
+ @Override
+ public String create(PipelineOptions options) {
+ CSVFormat csvFormat = getCsvFormat(options.as(CsvPipelineOptions.class).getCsvFormat(), null);
+ return String.valueOf(csvFormat.getDelimiter());
+ }
+ }
+
+ /**
+ * The {@link LineToFailsafeJson} interface converts a line from a Csv file into a Json string.
+ * Uses either: Javascript Udf, Json schema or the headers of the file to create the Json object
+ * which is then added to the {@link FailsafeElement} as the new payload.
+ */
+ @AutoValue
+ public abstract static class LineToFailsafeJson
+ extends PTransform {
+
+ public static Builder newBuilder() {
+ return new AutoValue_CsvConverters_LineToFailsafeJson.Builder();
+ }
+
+ public abstract String delimiter();
+
+ @Nullable
+ public abstract String jsonSchemaPath();
+
+ @Nullable
+ public abstract String jsonSchema();
+
+ public abstract TupleTag headerTag();
+
+ public abstract TupleTag lineTag();
+
+ public abstract TupleTag> udfOutputTag();
+
+ public abstract TupleTag> udfDeadletterTag();
+
+ @Override
+ public PCollectionTuple expand(PCollectionTuple lines) {
+
+ PCollectionView headersView = null;
+
+ // Convert csv lines into Failsafe elements so that we can recover over multiple transforms.
+ PCollection> lineFailsafeElements =
+ lines
+ .get(lineTag())
+ .apply("LineToFailsafeElement", ParDo.of(new LineToFailsafeElementFn()));
+
+ // If no udf then use json schema
+ String schemaPath = jsonSchemaPath();
+ if (schemaPath != null || jsonSchema() != null) {
+
+ String schema;
+ if (schemaPath != null) {
+ schema = getGcsFileAsString(schemaPath);
+ } else {
+ schema = jsonSchema();
+ }
+
+ return lineFailsafeElements.apply(
+ "LineToDocumentUsingSchema",
+ ParDo.of(
+ new FailsafeElementToJsonFn(
+ headersView, schema, delimiter(), udfDeadletterTag()))
+ .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag())));
+ }
+
+ // Run if using headers
+ headersView = lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton());
+
+ PCollectionView finalHeadersView = headersView;
+ lines
+ .get(headerTag())
+ .apply(
+ "CheckHeaderConsistency",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ String headers = c.sideInput(finalHeadersView);
+ if (!c.element().equals(headers)) {
+ LOG.error("Headers do not match, consistency cannot be guaranteed");
+ throw new RuntimeException(
+ "Headers do not match, consistency cannot be guaranteed");
+ }
+ }
+ })
+ .withSideInputs(finalHeadersView));
+
+ return lineFailsafeElements.apply(
+ "LineToDocumentWithHeaders",
+ ParDo.of(
+ new FailsafeElementToJsonFn(
+ headersView, jsonSchemaPath(), delimiter(), udfDeadletterTag()))
+ .withSideInputs(headersView)
+ .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag())));
+ }
+
+ /** Builder for {@link LineToFailsafeJson}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setDelimiter(String delimiter);
+
+ public abstract Builder setJsonSchemaPath(String jsonSchemaPath);
+
+ public abstract Builder setJsonSchema(String jsonSchema);
+
+ public abstract Builder setHeaderTag(TupleTag headerTag);
+
+ public abstract Builder setLineTag(TupleTag lineTag);
+
+ public abstract Builder setUdfOutputTag(
+ TupleTag> udfOutputTag);
+
+ public abstract Builder setUdfDeadletterTag(
+ TupleTag> udfDeadletterTag);
+
+ public abstract LineToFailsafeJson build();
+ }
+ }
+
+ /**
+ * The {@link FailsafeElementToJsonFn} class creates a Json string from a failsafe element.
+ *
+ *
{@link FailsafeElementToJsonFn#FailsafeElementToJsonFn(PCollectionView, String, String,
+ * TupleTag)}
+ */
+ public static class FailsafeElementToJsonFn
+ extends DoFn, FailsafeElement> {
+
+ @Nullable public final String jsonSchema;
+ public final String delimiter;
+ public final TupleTag> udfDeadletterTag;
+ @Nullable private final PCollectionView headersView;
+ private Counter successCounter =
+ Metrics.counter(FailsafeElementToJsonFn.class, SUCCESSFUL_TO_JSON_COUNTER);
+ private Counter failedCounter =
+ Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER);
+
+ FailsafeElementToJsonFn(
+ PCollectionView headersView,
+ String jsonSchema,
+ String delimiter,
+ TupleTag> udfDeadletterTag) {
+ this.headersView = headersView;
+ this.jsonSchema = jsonSchema;
+ this.delimiter = delimiter;
+ this.udfDeadletterTag = udfDeadletterTag;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ FailsafeElement element = context.element();
+ List header = null;
+
+ if (this.headersView != null) {
+ header = Arrays.asList(context.sideInput(this.headersView).split(this.delimiter));
+ }
+
+ List record = Arrays.asList(element.getOriginalPayload().split(this.delimiter));
+
+ try {
+ String json = buildJsonString(header, record, this.jsonSchema);
+ context.output(FailsafeElement.of(element.getOriginalPayload(), json));
+ successCounter.inc();
+ } catch (Exception e) {
+ failedCounter.inc();
+ context.output(
+ this.udfDeadletterTag,
+ FailsafeElement.of(element)
+ .setErrorMessage(e.getMessage())
+ .setStacktrace(Throwables.getStackTraceAsString(e)));
+ }
+ }
+ }
+
+ /**
+ * The {@link LineToFailsafeElementFn} wraps an csv line with the {@link FailsafeElement} class so
+ * errors can be recovered from and the original message can be output to a error records table.
+ */
+ static class LineToFailsafeElementFn extends DoFn> {
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ String message = context.element();
+ context.output(FailsafeElement.of(message, message));
+ }
+ }
+
+ /**
+ * The {@link ReadCsv} class is a {@link PTransform} that reads from one for more Csv files. The
+ * transform returns a {@link PCollectionTuple} consisting of the following {@link PCollection}:
+ *
+ *
+ *
{@link ReadCsv#headerTag()} - Contains headers found in files if read with headers,
+ * contains empty {@link PCollection} if no headers.
+ *
{@link ReadCsv#lineTag()} - Contains Csv lines as a {@link PCollection} of strings.
+ *
+ */
+ @AutoValue
+ public abstract static class ReadCsv extends PTransform {
+
+ public static Builder newBuilder() {
+ return new AutoValue_CsvConverters_ReadCsv.Builder();
+ }
+
+ public abstract String csvFormat();
+
+ @Nullable
+ public abstract String delimiter();
+
+ public abstract Boolean hasHeaders();
+
+ public abstract String inputFileSpec();
+
+ public abstract TupleTag headerTag();
+
+ public abstract TupleTag lineTag();
+
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+
+ if (hasHeaders()) {
+ return input
+ .apply("MatchFilePattern", FileIO.match().filepattern(inputFileSpec()))
+ .apply("ReadMatches", FileIO.readMatches())
+ .apply(
+ "ReadCsvWithHeaders",
+ ParDo.of(new GetCsvHeadersFn(headerTag(), lineTag(), csvFormat(), delimiter()))
+ .withOutputTags(headerTag(), TupleTagList.of(lineTag())));
+ }
+
+ return PCollectionTuple.of(
+ lineTag(), input.apply("ReadCsvWithoutHeaders", TextIO.read().from(inputFileSpec())));
+ }
+
+ /** Builder for {@link ReadCsv}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setCsvFormat(String csvFormat);
+
+ public abstract Builder setDelimiter(@Nullable String delimiter);
+
+ public abstract Builder setHasHeaders(Boolean hasHeaders);
+
+ public abstract Builder setInputFileSpec(String inputFileSpec);
+
+ public abstract Builder setHeaderTag(TupleTag headerTag);
+
+ public abstract Builder setLineTag(TupleTag lineTag);
+
+ abstract ReadCsv autoBuild();
+
+ public ReadCsv build() {
+
+ ReadCsv readCsv = autoBuild();
+
+ checkArgument(readCsv.inputFileSpec() != null, "Input file spec must be provided.");
+
+ checkArgument(readCsv.csvFormat() != null, "Csv format must not be null.");
+
+ checkArgument(readCsv.hasHeaders() != null, "Header information must be provided.");
+
+ return readCsv;
+ }
+ }
+ }
+
+ /**
+ * The {@link GetCsvHeadersFn} class gets the header of a Csv file and outputs it as a string. The
+ * csv format provided in {@link CsvConverters#getCsvFormat(String, String)} is used to get the
+ * header.
+ */
+ static class GetCsvHeadersFn extends DoFn {
+
+ private final TupleTag headerTag;
+ private final TupleTag linesTag;
+ private CSVFormat csvFormat;
+
+ GetCsvHeadersFn(
+ TupleTag headerTag, TupleTag linesTag, String csvFormat, String delimiter) {
+ this.headerTag = headerTag;
+ this.linesTag = linesTag;
+ this.csvFormat = getCsvFormat(csvFormat, delimiter);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) {
+ ReadableFile f = context.element();
+ String headers;
+ List records = null;
+ String delimiter = String.valueOf(this.csvFormat.getDelimiter());
+ try {
+ String csvFileString = f.readFullyAsUTF8String();
+ StringReader reader = new StringReader(csvFileString);
+ CSVParser parser = CSVParser.parse(reader, this.csvFormat.withFirstRecordAsHeader());
+ records =
+ parser.getRecords().stream()
+ .map(i -> String.join(delimiter, i))
+ .collect(Collectors.toList());
+ headers = String.join(delimiter, parser.getHeaderNames());
+ } catch (IOException ioe) {
+ LOG.error("Headers do not match, consistency cannot be guaranteed");
+ throw new RuntimeException("Could not read Csv headers: " + ioe.getMessage());
+ }
+ outputReceiver.get(this.headerTag).output(headers);
+ records.forEach(r -> outputReceiver.get(this.linesTag).output(r));
+ }
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java
new file mode 100644
index 0000000000000..83d3aea3acb5e
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Locale;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.MutablePeriod;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.joda.time.format.PeriodParser;
+
+/**
+ * The {@link DurationUtils} class provides common utilities for manipulating and formatting {@link
+ * Duration} objects.
+ */
+public class DurationUtils {
+
+ /**
+ * Parses a duration from a period formatted string. Values are accepted in the following formats:
+ *
+ *
+ *
+ * @param value The period value to parse.
+ * @return The {@link Duration} parsed from the supplied period string.
+ */
+ public static Duration parseDuration(String value) {
+ checkNotNull(value, "The specified duration must be a non-null value!");
+
+ PeriodParser parser =
+ new PeriodFormatterBuilder()
+ .appendSeconds()
+ .appendSuffix("s")
+ .appendMinutes()
+ .appendSuffix("m")
+ .appendHours()
+ .appendSuffix("h")
+ .toParser();
+
+ MutablePeriod period = new MutablePeriod();
+ parser.parseInto(period, value, 0, Locale.getDefault());
+
+ Duration duration = period.toDurationFrom(new DateTime(0));
+ checkArgument(duration.getMillis() > 0, "The window duration must be greater than 0!");
+
+ return duration;
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
new file mode 100644
index 0000000000000..04beac038926c
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/** Transforms & DoFns & Options for Teleport Error logging. */
+public class ErrorConverters {
+
+ /** Writes all Errors to GCS, place at the end of your pipeline. */
+ @AutoValue
+ public abstract static class WriteStringMessageErrorsAsCsv
+ extends PTransform>, PDone> {
+
+ public static Builder newBuilder() {
+ return new AutoValue_ErrorConverters_WriteStringMessageErrorsAsCsv.Builder();
+ }
+
+ public abstract String errorWritePath();
+
+ public abstract String csvDelimiter();
+
+ @Nullable
+ public abstract Duration windowDuration();
+
+ @SuppressWarnings("argument.type.incompatible")
+ @Override
+ public PDone expand(PCollection> pCollection) {
+
+ PCollection formattedErrorRows =
+ pCollection.apply(
+ "GetFormattedErrorRow", ParDo.of(new FailedStringToCsvRowFn(csvDelimiter())));
+
+ if (pCollection.isBounded() == IsBounded.UNBOUNDED) {
+ if (windowDuration() != null) {
+ formattedErrorRows =
+ formattedErrorRows.apply(Window.into(FixedWindows.of(windowDuration())));
+ }
+ return formattedErrorRows.apply(
+ TextIO.write().to(errorWritePath()).withNumShards(1).withWindowedWrites());
+
+ } else {
+ return formattedErrorRows.apply(TextIO.write().to(errorWritePath()).withNumShards(1));
+ }
+ }
+
+ /** Builder for {@link WriteStringMessageErrorsAsCsv}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setErrorWritePath(String errorWritePath);
+
+ public abstract Builder setCsvDelimiter(String csvDelimiter);
+
+ public abstract Builder setWindowDuration(@Nullable Duration duration);
+
+ public abstract WriteStringMessageErrorsAsCsv build();
+ }
+ }
+
+ /**
+ * The {@link FailedStringToCsvRowFn} converts string objects which have failed processing into
+ * {@link String} objects contained CSV which can be output to a filesystem.
+ */
+ public static class FailedStringToCsvRowFn extends DoFn, String> {
+
+ /**
+ * The formatter used to convert timestamps into a BigQuery compatible format.
+ */
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+ private final String csvDelimiter;
+
+ public FailedStringToCsvRowFn(String csvDelimiter) {
+ this.csvDelimiter = csvDelimiter;
+ }
+
+ public FailedStringToCsvRowFn() {
+ this.csvDelimiter = ",";
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ FailsafeElement failsafeElement = context.element();
+ ArrayList outputRow = new ArrayList<>();
+ final String message = failsafeElement.getOriginalPayload();
+
+ // Format the timestamp for insertion
+ String timestamp =
+ TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
+
+ outputRow.add(timestamp);
+ outputRow.add(failsafeElement.getErrorMessage());
+
+ // Only set the payload if it's populated on the message.
+ if (message != null) {
+ outputRow.add(message);
+ }
+
+ context.output(String.join(csvDelimiter, outputRow));
+ }
+ }
+
+ /** Write errors as string encoded messages. */
+ @AutoValue
+ public abstract static class WriteStringMessageErrors
+ extends PTransform>, WriteResult> {
+
+ public static Builder newBuilder() {
+ return new AutoValue_ErrorConverters_WriteStringMessageErrors.Builder();
+ }
+
+ public abstract String getErrorRecordsTable();
+
+ public abstract String getErrorRecordsTableSchema();
+
+ @Override
+ public WriteResult expand(PCollection> failedRecords) {
+
+ return failedRecords
+ .apply("FailedRecordToTableRow", ParDo.of(new FailedStringToTableRowFn()))
+ .apply(
+ "WriteFailedRecordsToBigQuery",
+ BigQueryIO.writeTableRows()
+ .to(getErrorRecordsTable())
+ .withJsonSchema(getErrorRecordsTableSchema())
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ }
+
+ /** Builder for {@link WriteStringMessageErrors}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setErrorRecordsTable(String errorRecordsTable);
+
+ public abstract Builder setErrorRecordsTableSchema(String errorRecordsTableSchema);
+
+ public abstract WriteStringMessageErrors build();
+ }
+ }
+
+ /**
+ * The {@link FailedStringToTableRowFn} converts string objects which have failed processing into
+ * {@link TableRow} objects which can be output to a dead-letter table.
+ */
+ public static class FailedStringToTableRowFn
+ extends DoFn, TableRow> {
+
+ /**
+ * The formatter used to convert timestamps into a BigQuery compatible format.
+ */
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ FailsafeElement failsafeElement = context.element();
+ final String message = failsafeElement.getOriginalPayload();
+
+ // Format the timestamp for insertion
+ String timestamp =
+ TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
+
+ // Build the table row
+ final TableRow failedRow =
+ new TableRow()
+ .set("timestamp", timestamp)
+ .set("errorMessage", failsafeElement.getErrorMessage())
+ .set("stacktrace", failsafeElement.getStacktrace());
+
+ // Only set the payload if it's populated on the message.
+ if (message != null) {
+ failedRow
+ .set("payloadString", message)
+ .set("payloadBytes", message.getBytes(StandardCharsets.UTF_8));
+ }
+
+ context.output(failedRow);
+ }
+ }
+
+ /**
+ * {@link WriteErrorsToTextIO} is a {@link PTransform} that writes strings error messages to file
+ * system using TextIO and custom line format {@link SerializableFunction} to convert errors in
+ * necessary format.
+ * Example of usage in pipeline:
+ *
+ *
{@code
+ * pCollection.apply("Write to TextIO",
+ * WriteErrorsToTextIO.newBuilder()
+ * .setErrorWritePath("errors.txt")
+ * .setTranslateFunction((FailsafeElement failsafeElement) -> {
+ * ArrayList outputRow = new ArrayList<>();
+ * final String message = failsafeElement.getOriginalPayload();
+ * String timestamp = Instant.now().toString();
+ * outputRow.add(timestamp);
+ * outputRow.add(failsafeElement.getErrorMessage());
+ * outputRow.add(failsafeElement.getStacktrace());
+ * // Only set the payload if it's populated on the message.
+ * if (failsafeElement.getOriginalPayload() != null) {
+ * outputRow.add(message);
+ * }
+ *
+ * return String.join(",",outputRow);
+ * })
+ * }
+ */
+ @AutoValue
+ public abstract static class WriteErrorsToTextIO
+ extends PTransform>, PDone> {
+
+ public static WriteErrorsToTextIO.Builder newBuilder() {
+ return new AutoValue_ErrorConverters_WriteErrorsToTextIO.Builder<>();
+ }
+
+ public abstract String errorWritePath();
+
+ public abstract SerializableFunction, String> translateFunction();
+
+ @Nullable
+ public abstract Duration windowDuration();
+
+ @Override
+ @SuppressWarnings("argument.type.incompatible")
+ public PDone expand(PCollection> pCollection) {
+
+ PCollection formattedErrorRows =
+ pCollection.apply(
+ "GetFormattedErrorRow",
+ MapElements.into(TypeDescriptors.strings()).via(translateFunction()));
+
+ if (pCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) {
+ if (windowDuration() == null) {
+ throw new RuntimeException("Unbounded input requires window interval to be set");
+ }
+ return formattedErrorRows
+ .apply(Window.into(FixedWindows.of(windowDuration())))
+ .apply(TextIO.write().to(errorWritePath()).withNumShards(1).withWindowedWrites());
+ }
+
+ return formattedErrorRows.apply(TextIO.write().to(errorWritePath()).withNumShards(1));
+ }
+
+ /** Builder for {@link WriteErrorsToTextIO}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract WriteErrorsToTextIO.Builder setErrorWritePath(String errorWritePath);
+
+ public abstract WriteErrorsToTextIO.Builder setTranslateFunction(
+ SerializableFunction, String> translateFunction);
+
+ public abstract WriteErrorsToTextIO.Builder setWindowDuration(
+ @Nullable Duration duration);
+
+ abstract SerializableFunction, String> translateFunction();
+
+ abstract WriteErrorsToTextIO autoBuild();
+
+ public WriteErrorsToTextIO build() {
+ checkNotNull(translateFunction(), "translateFunction is required.");
+ return autoBuild();
+ }
+ }
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElement.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElement.java
new file mode 100644
index 0000000000000..66f1c3c176a8f
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElement.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import java.util.Objects;
+import org.apache.avro.reflect.Nullable;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
+
+/**
+ * The {@link FailsafeElement} class holds the current value and original value of a record within a
+ * pipeline. This class allows pipelines to not lose valuable information about an incoming record
+ * throughout the processing of that record. The use of this class allows for more robust
+ * dead-letter strategies as the original record information is not lost throughout the pipeline and
+ * can be output to a dead-letter in the event of a failure during one of the pipelines transforms.
+ */
+@DefaultCoder(FailsafeElementCoder.class)
+public class FailsafeElement {
+
+ private final OriginalT originalPayload;
+ private final CurrentT payload;
+ @Nullable private String errorMessage = "";
+ @Nullable private String stacktrace = "";
+
+ private FailsafeElement(OriginalT originalPayload, CurrentT payload) {
+ this.originalPayload = originalPayload;
+ this.payload = payload;
+ }
+
+ public static FailsafeElement of(
+ OriginalT originalPayload, CurrentT currentPayload) {
+ return new FailsafeElement<>(originalPayload, currentPayload);
+ }
+
+ public static FailsafeElement of(
+ FailsafeElement other) {
+ return new FailsafeElement<>(other.originalPayload, other.payload)
+ .setErrorMessage(other.getErrorMessage())
+ .setStacktrace(other.getStacktrace());
+ }
+
+ public OriginalT getOriginalPayload() {
+ return originalPayload;
+ }
+
+ public CurrentT getPayload() {
+ return payload;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public FailsafeElement setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ return this;
+ }
+
+ public String getStacktrace() {
+ return stacktrace;
+ }
+
+ public FailsafeElement setStacktrace(String stacktrace) {
+ this.stacktrace = stacktrace;
+ return this;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(originalPayload, payload, errorMessage, stacktrace);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("originalPayload", originalPayload)
+ .add("payload", payload)
+ .add("errorMessage", errorMessage)
+ .add("stacktrace", stacktrace)
+ .toString();
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElementCoder.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElementCoder.java
new file mode 100644
index 0000000000000..151d98a9070ec
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElementCoder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+
+/**
+ * The {@link FailsafeElementCoder} encodes and decodes {@link FailsafeElement} objects.
+ *
+ *
This coder is necessary until Avro supports parameterized types (AVRO-1571) without requiring to
+ * explicitly specifying the schema for the type.
+ *
+ * @param The type of the original payload to be encoded.
+ * @param The type of the current payload to be encoded.
+ */
+public class FailsafeElementCoder
+ extends CustomCoder> {
+
+ private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of());
+ private final Coder originalPayloadCoder;
+ private final Coder currentPayloadCoder;
+
+ private FailsafeElementCoder(
+ Coder originalPayloadCoder, Coder currentPayloadCoder) {
+ this.originalPayloadCoder = originalPayloadCoder;
+ this.currentPayloadCoder = currentPayloadCoder;
+ }
+
+ public Coder getOriginalPayloadCoder() {
+ return originalPayloadCoder;
+ }
+
+ public Coder getCurrentPayloadCoder() {
+ return currentPayloadCoder;
+ }
+
+ public static FailsafeElementCoder of(
+ Coder originalPayloadCoder, Coder currentPayloadCoder) {
+ return new FailsafeElementCoder<>(originalPayloadCoder, currentPayloadCoder);
+ }
+
+ @Override
+ public void encode(FailsafeElement value, OutputStream outStream)
+ throws IOException {
+ if (value == null) {
+ throw new CoderException("The FailsafeElementCoder cannot encode a null object!");
+ }
+
+ originalPayloadCoder.encode(value.getOriginalPayload(), outStream);
+ currentPayloadCoder.encode(value.getPayload(), outStream);
+ STRING_CODER.encode(value.getErrorMessage(), outStream);
+ STRING_CODER.encode(value.getStacktrace(), outStream);
+ }
+
+ @Override
+ public FailsafeElement decode(InputStream inStream) throws IOException {
+
+ OriginalT originalPayload = originalPayloadCoder.decode(inStream);
+ CurrentT currentPayload = currentPayloadCoder.decode(inStream);
+ String errorMessage = STRING_CODER.decode(inStream);
+ String stacktrace = STRING_CODER.decode(inStream);
+
+ return FailsafeElement.of(originalPayload, currentPayload)
+ .setErrorMessage(errorMessage)
+ .setStacktrace(stacktrace);
+ }
+
+ @Override
+ public List extends Coder>> getCoderArguments() {
+ return Arrays.asList(originalPayloadCoder, currentPayloadCoder);
+ }
+
+ @Override
+ public TypeDescriptor> getEncodedTypeDescriptor() {
+ return new TypeDescriptor>() {}.where(
+ new TypeParameter() {}, originalPayloadCoder.getEncodedTypeDescriptor())
+ .where(new TypeParameter() {}, currentPayloadCoder.getEncodedTypeDescriptor());
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/RowToCsv.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/RowToCsv.java
new file mode 100644
index 0000000000000..f8ab7d4ff39f6
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/RowToCsv.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.values.Row;
+
+/** The {@link RowToCsv} class to convert Beam Rows into strings in CSV format. */
+public class RowToCsv {
+
+ private final String csvDelimiter;
+
+ public RowToCsv(String csvDelimiter) {
+ this.csvDelimiter = csvDelimiter;
+ }
+
+ public String getCsvFromRow(Row row) {
+ return row.getValues().stream()
+ .map(item -> item == null ? "null" : item)
+ .map(Object::toString)
+ .collect(Collectors.joining(csvDelimiter));
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java
new file mode 100644
index 0000000000000..e8dd24eabeeea
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.fromTableSchema;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link SchemasUtils} Class to read JSON based schema. Is there available to read from file or
+ * from string. Currently supported local File System and GCS.
+ */
+@SuppressWarnings({
+ "initialization.fields.uninitialized",
+ "method.invocation.invalid",
+ "dereference.of.nullable",
+ "argument.type.incompatible",
+ "return.type.incompatible"
+})
+public class SchemasUtils {
+
+ /* Logger for class.*/
+ private static final Logger LOG = LoggerFactory.getLogger(SchemasUtils.class);
+
+ private TableSchema bigQuerySchema;
+ private Schema beamSchema;
+ private String jsonBeamSchema;
+
+ public SchemasUtils(String schema) {
+ parseJson(schema);
+ }
+
+ public SchemasUtils(String path, Charset encoding) throws IOException {
+ if (path.startsWith("gs://")) {
+ parseJson(new String(readGcsFile(path), encoding));
+ } else {
+ byte[] encoded = Files.readAllBytes(Paths.get(path));
+ parseJson(new String(encoded, encoding));
+ }
+ LOG.info("Extracted schema: " + bigQuerySchema.toPrettyString());
+ }
+
+ public TableSchema getBigQuerySchema() {
+ return bigQuerySchema;
+ }
+
+ private void parseJson(String jsonSchema) throws UnsupportedOperationException {
+ TableSchema schema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+ validateSchemaTypes(schema);
+ bigQuerySchema = schema;
+ jsonBeamSchema = BigQueryHelpers.toJsonString(schema.getFields());
+ }
+
+ private void validateSchemaTypes(TableSchema bigQuerySchema) {
+ try {
+ beamSchema = fromTableSchema(bigQuerySchema);
+ } catch (UnsupportedOperationException exception) {
+ LOG.error("Check json schema, {}", exception.getMessage());
+ } catch (NullPointerException npe) {
+ LOG.error("Missing schema keywords, please check what all required fields presented");
+ }
+ }
+
+ /**
+ * Method to read a schema file from GCS and return the file contents as a string.
+ *
+ * @param gcsFilePath path to file in GCS in format "gs://your-bucket/path/to/file"
+ * @return byte array with file contents
+ * @throws IOException thrown if not able to read file
+ */
+ public static byte[] readGcsFile(String gcsFilePath) throws IOException {
+ LOG.info("Reading contents from GCS file: {}", gcsFilePath);
+ // Read the GCS file into byte array and will throw an I/O exception in case file not found.
+ try (ReadableByteChannel readerChannel =
+ FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) {
+ try (InputStream stream = Channels.newInputStream(readerChannel)) {
+ return ByteStreams.toByteArray(stream);
+ }
+ }
+ }
+
+ public Schema getBeamSchema() {
+ return beamSchema;
+ }
+
+ public String getJsonBeamSchema() {
+ return jsonBeamSchema;
+ }
+
+ /**
+ * Reads a file from GCS and returns it as a string.
+ *
+ * @param filePath path to file in GCS
+ * @return contents of the file as a string
+ * @throws IOException thrown if not able to read file
+ */
+ public static String getGcsFileAsString(String filePath) {
+ MatchResult result;
+ try {
+ result = FileSystems.match(filePath);
+ checkArgument(
+ result.status() == MatchResult.Status.OK && !result.metadata().isEmpty(),
+ "Failed to match any files with the pattern: " + filePath);
+
+ List rId =
+ result.metadata().stream()
+ .map(MatchResult.Metadata::resourceId)
+ .collect(Collectors.toList());
+
+ checkArgument(rId.size() == 1, "Expected exactly 1 file, but got " + rId.size() + " files.");
+
+ Reader reader =
+ Channels.newReader(FileSystems.open(rId.get(0)), StandardCharsets.UTF_8.name());
+
+ return CharStreams.toString(reader);
+
+ } catch (IOException ioe) {
+ LOG.error("File system i/o error: " + ioe.getMessage());
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ public static final String DEADLETTER_SCHEMA =
+ "{\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"timestamp\",\n"
+ + " \"type\": \"TIMESTAMP\",\n"
+ + " \"mode\": \"REQUIRED\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"payloadString\",\n"
+ + " \"type\": \"STRING\",\n"
+ + " \"mode\": \"REQUIRED\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"payloadBytes\",\n"
+ + " \"type\": \"BYTES\",\n"
+ + " \"mode\": \"REQUIRED\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"attributes\",\n"
+ + " \"type\": \"RECORD\",\n"
+ + " \"mode\": \"REPEATED\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"key\",\n"
+ + " \"type\": \"STRING\",\n"
+ + " \"mode\": \"NULLABLE\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"value\",\n"
+ + " \"type\": \"STRING\",\n"
+ + " \"mode\": \"NULLABLE\"\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"errorMessage\",\n"
+ + " \"type\": \"STRING\",\n"
+ + " \"mode\": \"NULLABLE\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"stacktrace\",\n"
+ + " \"type\": \"STRING\",\n"
+ + " \"mode\": \"NULLABLE\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/package-info.java
new file mode 100644
index 0000000000000..e53a4f7ce0462
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) 2020 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/** Protegrity Data Tokenization template for Google Cloud Teleport. */
+package org.apache.beam.examples.complete.datatokenization.utils;
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/datatokenization/DataTokenizationTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/datatokenization/DataTokenizationTest.java
new file mode 100644
index 0000000000000..a2b1ec05a2ec3
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/datatokenization/DataTokenizationTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO;
+import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO.FORMAT;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv;
+import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link DataTokenization}. */
+@RunWith(JUnit4.class)
+public class DataTokenizationTest {
+
+ private static final String testSchema =
+ "{\"fields\":[{\"mode\":\"REQUIRED\",\"name\":\"FieldName1\",\"type\":\"STRING\"},{\"mode\":\"REQUIRED\",\"name\":\"FieldName2\",\"type\":\"STRING\"}]}";
+ String[] fields = {"TestValue1", "TestValue2"};
+
+ @Rule public final transient TestPipeline testPipeline = TestPipeline.create();
+
+ private static final String RESOURCES_DIR = "./";
+
+ private static final String CSV_FILE_PATH =
+ Resources.getResource(RESOURCES_DIR + "testInput.csv").getPath();
+
+ private static final String JSON_FILE_PATH =
+ Resources.getResource(RESOURCES_DIR + "testInput.txt").getPath();
+
+ private static final String SCHEMA_FILE_PATH =
+ Resources.getResource(RESOURCES_DIR + "schema.txt").getPath();
+
+ private static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER =
+ FailsafeElementCoder.of(
+ NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));
+
+ @Test
+ public void testGetBeamSchema() {
+ Schema expectedSchema =
+ Schema.builder()
+ .addField("FieldName1", FieldType.STRING)
+ .addField("FieldName2", FieldType.STRING)
+ .build();
+ SchemasUtils schemasUtils = new SchemasUtils(testSchema);
+ Assert.assertEquals(expectedSchema, schemasUtils.getBeamSchema());
+ }
+
+ @Test
+ public void testGetBigQuerySchema() {
+ SchemasUtils schemasUtils = new SchemasUtils(testSchema);
+ Assert.assertEquals(testSchema, schemasUtils.getBigQuerySchema().toString());
+ }
+
+ @Test
+ public void testRowToCSV() {
+ Schema beamSchema = new SchemasUtils(testSchema).getBeamSchema();
+ Row.Builder rowBuilder = Row.withSchema(beamSchema);
+ Row row = rowBuilder.addValues(new ArrayList<>(Arrays.asList(fields))).build();
+ String csvResult = new RowToCsv(";").getCsvFromRow(row);
+ Assert.assertEquals(String.join(";", fields), csvResult);
+ }
+
+ @Test
+ public void testRowToCSVWithNull() {
+ final String nullableTestSchema =
+ "{\"fields\":[{\"mode\":\"REQUIRED\",\"name\":\"FieldName1\",\"type\":\"STRING\"},{\"mode\":\"NULLABLE\",\"name\":\"FieldName2\",\"type\":\"STRING\"}]}";
+ final String expectedCsv = "TestValueOne;null";
+
+ List