From fd3075fa52303a0d684c2d7a91814eea5305476e Mon Sep 17 00:00:00 2001 From: Artur Khanin Date: Wed, 31 Mar 2021 04:43:03 +0300 Subject: [PATCH] Merge pull request #13995 from [BEAM-11322] Apache Beam Example to tokenize sensitive data * [WIP] Transfer from DataflowTemplates to Beam * Move to beam repo * moved convertors for GCSio * Renaming + readme * build errors * minimize suppress * remove UDF usages * Fixes for stylechecks * grooming for data protectors * grooming for data protectors * fix javadoc * Added support for window writing; Fixed ordering in tokenization process * supressed checkstyle errors for BigTableIO class * add data tokenization tests * Changed GCS to FileSystem and removed redundant function from SchemasUtils * Updated README.md for local run with BigQuery sink * add docstring * Updated README.md * Updated README.md and added javadoc for the main pipeline class * remove unused test case * Style fix * Whitespaces fix * Fixed undeclared dependencies and excluded .csv resource files from license analysis * Fix for incorrect rpc url * Fix for nullable types * Data tokenization example group into batches (#11) * GroupIntoBatches was used in the data tokenization pipeline * io files were renamed for the data tokenization template * code format fixed * Getting value from environment variables for maxBufferingDurationMs. Information about it added to README (#14) Getting value from environment variables for maxBufferingDurationMs * [DATAFLOW-139] Incorrect DSG url lead to NPE (#13) * Fix bug incorrect DSG url lead to NPE DATAFLOW-139 Co-authored-by: daria-malkova Co-authored-by: Ilya Kozyrev Co-authored-by: Ramazan Yapparov Co-authored-by: Mikhail Medvedev Co-authored-by: Nuzhdina-Elena <79855159+Nuzhdina-Elena@users.noreply.github.com> Co-authored-by: Nuzhdina-Elena Co-authored-by: MikhailMedvedevAkvelon <78736905+MikhailMedvedevAkvelon@users.noreply.github.com> --- build.gradle.kts | 1 + examples/java/build.gradle | 13 + .../datatokenization/DataTokenization.java | 343 +++++++++++ .../complete/datatokenization/README.md | 172 ++++++ .../options/DataTokenizationOptions.java | 67 +++ .../options/package-info.java | 18 + .../datatokenization/package-info.java | 18 + .../transforms/DataProtectors.java | 290 ++++++++++ .../transforms/JsonToBeamRow.java | 76 +++ .../transforms/SerializableFunctions.java | 49 ++ .../transforms/io/TokenizationBigQueryIO.java | 98 ++++ .../transforms/io/TokenizationBigTableIO.java | 158 ++++++ .../io/TokenizationFileSystemIO.java | 261 +++++++++ .../transforms/io/package-info.java | 18 + .../transforms/package-info.java | 18 + .../datatokenization/utils/CsvConverters.java | 537 ++++++++++++++++++ .../datatokenization/utils/DurationUtils.java | 75 +++ .../utils/ErrorConverters.java | 311 ++++++++++ .../utils/FailsafeElement.java | 97 ++++ .../utils/FailsafeElementCoder.java | 106 ++++ .../datatokenization/utils/RowToCsv.java | 38 ++ .../datatokenization/utils/SchemasUtils.java | 206 +++++++ .../datatokenization/utils/package-info.java | 18 + .../DataTokenizationTest.java | 208 +++++++ examples/java/src/test/resources/schema.txt | 19 + .../java/src/test/resources/testInput.csv | 3 + .../java/src/test/resources/testInput.txt | 3 + .../src/main/resources/beam/suppressions.xml | 1 + 28 files changed, 3222 insertions(+) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/DataTokenization.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/DataTokenizationOptions.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/package-info.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/package-info.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/JsonToBeamRow.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/SerializableFunctions.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationFileSystemIO.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/package-info.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/package-info.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElement.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElementCoder.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/RowToCsv.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/package-info.java create mode 100644 examples/java/src/test/java/org/apache/beam/examples/complete/datatokenization/DataTokenizationTest.java create mode 100644 examples/java/src/test/resources/schema.txt create mode 100644 examples/java/src/test/resources/testInput.csv create mode 100644 examples/java/src/test/resources/testInput.txt diff --git a/build.gradle.kts b/build.gradle.kts index ddcb1f1d03464..05f6856d77339 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -50,6 +50,7 @@ tasks.rat { "**/test.avsc", "**/user.avsc", "**/test/resources/**/*.txt", + "**/test/resources/**/*.csv", "**/test/**/.placeholder", // Default eclipse excludes neglect subprojects diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 97af4ad483ced..af0d7b80bee90 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -71,9 +71,15 @@ dependencies { compile library.java.google_code_gson compile library.java.google_http_client compile library.java.google_oauth_client + compile library.java.jackson_databind compile library.java.joda_time + compile library.java.protobuf_java + compile library.java.proto_google_cloud_bigtable_v2 compile library.java.proto_google_cloud_datastore_v1 compile library.java.slf4j_api + provided library.java.commons_io + provided library.java.commons_csv + runtime project(path: ":runners:direct-java", configuration: "shadow") compile library.java.vendored_grpc_1_26_0 compile library.java.vendored_guava_26_0_jre compile "com.google.api.grpc:proto-google-cloud-language-v1:1.81.4" @@ -82,6 +88,7 @@ dependencies { // "spotbugs-annotations:3.1.12" used in Beam. Not required. exclude group: "org.apache.zookeeper", module: "zookeeper" } + compile "org.apache.commons:commons-lang3:3.9" compile "org.apache.httpcomponents:httpclient:4.5.13" compile "org.apache.httpcomponents:httpcore:4.4.13" testCompile project(path: ":runners:direct-java", configuration: "shadow") @@ -148,3 +155,9 @@ task preCommit() { } } +task execute (type:JavaExec) { + main = System.getProperty("mainClass") + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args System.getProperty("exec.args", "").split() +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/DataTokenization.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/DataTokenization.java new file mode 100644 index 0000000000000..bbe3759a8ff65 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/DataTokenization.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization; + +import static org.apache.beam.examples.complete.datatokenization.utils.DurationUtils.parseDuration; +import static org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils.DEADLETTER_SCHEMA; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions; +import org.apache.beam.examples.complete.datatokenization.transforms.DataProtectors.RowToTokenizedRow; +import org.apache.beam.examples.complete.datatokenization.transforms.JsonToBeamRow; +import org.apache.beam.examples.complete.datatokenization.transforms.SerializableFunctions; +import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationBigQueryIO; +import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationBigTableIO; +import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO; +import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder; +import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv; +import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link DataTokenization} pipeline reads data from one of the supported sources, tokenizes + * data with external API calls to some tokenization server, and writes data into one of the + * supported sinks.
+ * + *

Pipeline Requirements + * + *

    + *
  • Java 8 + *
  • Data schema (JSON with an array of fields described in BigQuery format) + *
  • 1 of supported sources to read data from + * + *
  • 1 of supported destination sinks to write data into + * + *
  • A configured tokenization server + *
+ * + *

Example Usage + * + *

+ * Gradle Preparation
+ * To run this example your  build.gradle file should contain the following task
+ * to execute the pipeline:
+ *   {@code
+ *   task execute (type:JavaExec) {
+ *      main = System.getProperty("mainClass")
+ *      classpath = sourceSets.main.runtimeClasspath
+ *      systemProperties System.getProperties()
+ *      args System.getProperty("exec.args", "").split()
+ *   }
+ *   }
+ * This task allows to run the pipeline via the following command:
+ *   {@code
+ *   gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+ *        -Dexec.args="--= --="
+ *   }
+ * Running the pipeline
+ * To execute this pipeline, specify the parameters:
+ *
+ * - Data schema
+ *     - dataSchemaPath: Path to data schema (JSON format) compatible with BigQuery.
+ * - 1 specified input source out of these:
+ *     - File System
+ *         - inputFilePattern: Filepattern for files to read data from
+ *         - inputFileFormat: File format of input files. Supported formats: JSON, CSV
+ *         - In case if input data is in CSV format:
+ *             - csvContainsHeaders: `true` if file(s) in bucket to read data from contain headers,
+ *               and `false` otherwise
+ *             - csvDelimiter: Delimiting character in CSV. Default: use delimiter provided in
+ *               csvFormat
+ *             - csvFormat: Csv format according to Apache Commons CSV format. Default is:
+ *               [Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+ *               . Must match format names exactly found
+ *               at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+ *     - Google Pub/Sub
+ *         - pubsubTopic: The Cloud Pub/Sub topic to read from, in the format of '
+ *           projects/yourproject/topics/yourtopic'
+ * - 1 specified output sink out of these:
+ *     - File System
+ *         - outputDirectory: Directory to write data to
+ *         - outputFileFormat: File format of output files. Supported formats: JSON, CSV
+ *         - windowDuration: The window duration in which data will be written. Should be specified
+ *           only for 'Pub/Sub -> FileSystem' case. Defaults to 30s.
+ *
+ *           Allowed formats are:
+ *             - Ns (for seconds, example: 5s),
+ *             - Nm (for minutes, example: 12m),
+ *             - Nh (for hours, example: 2h).
+ *     - Google Cloud BigQuery
+ *         - bigQueryTableName: Cloud BigQuery table name to write into
+ *         - tempLocation: Folder in a Google Cloud Storage bucket, which is needed for
+ *           BigQuery to handle data writing
+ *     - Cloud BigTable
+ *         - bigTableProjectId: Id of the project where the Cloud BigTable instance to write into
+ *           is located
+ *         - bigTableInstanceId: Id of the Cloud BigTable instance to write into
+ *         - bigTableTableId: Id of the Cloud BigTable table to write into
+ *         - bigTableKeyColumnName: Column name to use as a key in Cloud BigTable
+ *         - bigTableColumnFamilyName: Column family name to use in Cloud BigTable
+ * - RPC server parameters
+ *     - rpcUri: URI for the API calls to RPC server
+ *     - batchSize: Size of the batch to send to RPC server per request
+ *
+ * The template allows for the user to supply the following optional parameter:
+ *
+ * - nonTokenizedDeadLetterPath: Folder where failed to tokenize data will be stored
+ *
+ *
+ * Specify the parameters in the following format:
+ *
+ * {@code
+ * --dataSchemaPath="path-to-data-schema-in-json-format"
+ * --inputFilePattern="path-pattern-to-input-data"
+ * --outputDirectory="path-to-output-directory"
+ * # example for CSV case
+ * --inputFileFormat="CSV"
+ * --outputFileFormat="CSV"
+ * --csvContainsHeaders="true"
+ * --nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+ * --batchSize=batch-size-number
+ * --rpcUri=http://host:port/tokenize
+ * }
+ *
+ * By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify:
+ *
+ * {@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }
+ * 
+ */ +public class DataTokenization { + + /** Logger for class. */ + private static final Logger LOG = LoggerFactory.getLogger(DataTokenization.class); + + /** String/String Coder for FailsafeElement. */ + public static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER = + FailsafeElementCoder.of( + NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())); + + /** The default suffix for error tables if dead letter table is not specified. */ + private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records"; + + /** The tag for the main output for the UDF. */ + private static final TupleTag TOKENIZATION_OUT = new TupleTag() {}; + + /** The tag for the dead-letter output of the udf. */ + static final TupleTag> TOKENIZATION_DEADLETTER_OUT = + new TupleTag>() {}; + + /** + * Main entry point for pipeline execution. + * + * @param args Command line arguments to the pipeline. + */ + public static void main(String[] args) { + DataTokenizationOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(DataTokenizationOptions.class); + FileSystems.setDefaultPipelineOptions(options); + + run(options); + } + + /** + * Runs the pipeline to completion with the specified options. + * + * @param options The execution options. + * @return The pipeline result. + */ + @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"}) + public static PipelineResult run(DataTokenizationOptions options) { + SchemasUtils schema = null; + try { + schema = new SchemasUtils(options.getDataSchemaPath(), StandardCharsets.UTF_8); + } catch (IOException e) { + LOG.error("Failed to retrieve schema for data.", e); + } + checkArgument(schema != null, "Data schema is mandatory."); + + // Create the pipeline + Pipeline pipeline = Pipeline.create(options); + // Register the coder for pipeline + CoderRegistry coderRegistry = pipeline.getCoderRegistry(); + coderRegistry.registerCoderForType( + FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER); + coderRegistry.registerCoderForType( + RowCoder.of(schema.getBeamSchema()).getEncodedTypeDescriptor(), + RowCoder.of(schema.getBeamSchema())); + + /* + * Row/Row Coder for FailsafeElement. + */ + FailsafeElementCoder coder = + FailsafeElementCoder.of( + RowCoder.of(schema.getBeamSchema()), RowCoder.of(schema.getBeamSchema())); + + coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder); + + PCollection rows; + if (options.getInputFilePattern() != null) { + rows = new TokenizationFileSystemIO(options).read(pipeline, schema); + } else if (options.getPubsubTopic() != null) { + rows = + pipeline + .apply( + "ReadMessagesFromPubsub", + PubsubIO.readStrings().fromTopic(options.getPubsubTopic())) + .apply( + "TransformToBeamRow", + new JsonToBeamRow(options.getNonTokenizedDeadLetterPath(), schema)); + if (options.getOutputDirectory() != null) { + rows = rows.apply(Window.into(FixedWindows.of(parseDuration(options.getWindowDuration())))); + } + } else { + throw new IllegalStateException( + "No source is provided, please configure File System or Pub/Sub"); + } + + /* + Tokenize data using remote API call + */ + PCollectionTuple tokenizedRows = + rows.setRowSchema(schema.getBeamSchema()) + .apply( + MapElements.into( + TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.rows())) + .via((Row row) -> KV.of(0, row))) + .setCoder(KvCoder.of(VarIntCoder.of(), RowCoder.of(schema.getBeamSchema()))) + .apply( + "DsgTokenization", + RowToTokenizedRow.newBuilder() + .setBatchSize(options.getBatchSize()) + .setRpcURI(options.getRpcUri()) + .setSchema(schema.getBeamSchema()) + .setSuccessTag(TOKENIZATION_OUT) + .setFailureTag(TOKENIZATION_DEADLETTER_OUT) + .build()); + + String csvDelimiter = options.getCsvDelimiter(); + if (options.getNonTokenizedDeadLetterPath() != null) { + /* + Write tokenization errors to dead-letter sink + */ + tokenizedRows + .get(TOKENIZATION_DEADLETTER_OUT) + .apply( + "ConvertToCSV", + MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()) + .via( + (FailsafeElement fse) -> + FailsafeElement.of( + new RowToCsv(csvDelimiter).getCsvFromRow(fse.getOriginalPayload()), + new RowToCsv(csvDelimiter).getCsvFromRow(fse.getPayload())))) + .apply( + "WriteTokenizationErrorsToFS", + ErrorConverters.WriteErrorsToTextIO.newBuilder() + .setErrorWritePath(options.getNonTokenizedDeadLetterPath()) + .setTranslateFunction(SerializableFunctions.getCsvErrorConverter()) + .build()); + } + + if (options.getOutputDirectory() != null) { + new TokenizationFileSystemIO(options) + .write(tokenizedRows.get(TOKENIZATION_OUT), schema.getBeamSchema()); + } else if (options.getBigQueryTableName() != null) { + WriteResult writeResult = + TokenizationBigQueryIO.write( + tokenizedRows.get(TOKENIZATION_OUT), + options.getBigQueryTableName(), + schema.getBigQuerySchema()); + writeResult + .getFailedInsertsWithErr() + .apply( + "WrapInsertionErrors", + MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()) + .via(TokenizationBigQueryIO::wrapBigQueryInsertError)) + .setCoder(FAILSAFE_ELEMENT_CODER) + .apply( + "WriteInsertionFailedRecords", + ErrorConverters.WriteStringMessageErrors.newBuilder() + .setErrorRecordsTable( + options.getBigQueryTableName() + DEFAULT_DEADLETTER_TABLE_SUFFIX) + .setErrorRecordsTableSchema(DEADLETTER_SCHEMA) + .build()); + } else if (options.getBigTableInstanceId() != null) { + new TokenizationBigTableIO(options) + .write(tokenizedRows.get(TOKENIZATION_OUT), schema.getBeamSchema()); + } else { + throw new IllegalStateException( + "No sink is provided, please configure BigQuery or BigTable."); + } + + return pipeline.run(); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md new file mode 100644 index 0000000000000..7d92f604ee4f5 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md @@ -0,0 +1,172 @@ + + +# Apache Beam pipeline example to tokenize data using remote RPC server + +This directory contains an Apache Beam example that creates a pipeline to read data from one of +the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks. + +Supported data formats: + +- JSON +- CSV + +Supported input sources: + +- File system +- [Google Pub/Sub](https://cloud.google.com/pubsub) + +Supported destination sinks: + +- File system +- [Google Cloud BigQuery](https://cloud.google.com/bigquery) +- [Cloud BigTable](https://cloud.google.com/bigtable) + +Supported data schema format: + +- JSON with an array of fields described in BigQuery format + +In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or +JSON format from a specified input source, send the data to an external processing server, receive +processed data, and write it into a specified output sink. + +## Requirements + +- Java 8 +- 1 of supported sources to read data from +- 1 of supported destination sinks to write data into +- A configured RPC to tokenize data + +## Getting Started + +This section describes what is needed to get the template up and running. + +- Gradle preparation +- Local execution +- Running as a Dataflow Template + - Setting Up Project Environment + - Build Data Tokenization Dataflow Flex Template + - Creating the Dataflow Flex Template + - Executing Template + +## Gradle preparation + +To run this example your `build.gradle` file should contain the following task to execute the pipeline: + +``` +task execute (type:JavaExec) { + main = System.getProperty("mainClass") + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args System.getProperty("exec.args", "").split() +} +``` + +This task allows to run the pipeline via the following command: + +```bash +gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \ + -Dexec.args="--= --=" +``` + +## Running the pipeline + +To execute this pipeline, specify the parameters: + +- Data schema + - **dataSchemaPath**: Path to data schema (JSON format) compatible with BigQuery. +- 1 specified input source out of these: + - File System + - **inputFilePattern**: Filepattern for files to read data from + - **inputFileFormat**: File format of input files. Supported formats: JSON, CSV + - In case if input data is in CSV format: + - **csvContainsHeaders**: `true` if file(s) in bucket to read data from contain headers, + and `false` otherwise + - **csvDelimiter**: Delimiting character in CSV. Default: use delimiter provided in + csvFormat + - **csvFormat**: Csv format according to Apache Commons CSV format. Default is: + [Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT) + . Must match format names exactly found + at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html + - Google Pub/Sub + - **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format of ' + projects/yourproject/topics/yourtopic' +- 1 specified output sink out of these: + - File System + - **outputDirectory**: Directory to write data to + - **outputFileFormat**: File format of output files. Supported formats: JSON, CSV + - **windowDuration**: The window duration in which data will be written. Should be specified + only for 'Pub/Sub -> GCS' case. Defaults to 30s. + + Allowed formats are: + - Ns (for seconds, example: 5s), + - Nm (for minutes, example: 12m), + - Nh (for hours, example: 2h). + - Google Cloud BigQuery + - **bigQueryTableName**: Cloud BigQuery table name to write into + - **tempLocation**: Folder in a Google Cloud Storage bucket, which is needed for + BigQuery to handle data writing + - Cloud BigTable + - **bigTableProjectId**: Id of the project where the Cloud BigTable instance to write into + is located + - **bigTableInstanceId**: Id of the Cloud BigTable instance to write into + - **bigTableTableId**: Id of the Cloud BigTable table to write into + - **bigTableKeyColumnName**: Column name to use as a key in Cloud BigTable + - **bigTableColumnFamilyName**: Column family name to use in Cloud BigTable +- RPC server parameters + - **rpcUri**: URI for the API calls to RPC server + - **batchSize**: Size of the batch to send to RPC server per request + +The template allows for the user to supply the following optional parameter: + +- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be stored + +The template also allows user to override the environment variable: + +- **MAX_BUFFERING_DURATION_MS**: Max duration of buffering rows in milliseconds. Default value: 100ms. + +in the following format: + +```bash +--dataSchemaPath="path-to-data-schema-in-json-format" +--inputFilePattern="path-pattern-to-input-data" +--outputDirectory="path-to-output-directory" +# example for CSV case +--inputFileFormat="CSV" +--outputFileFormat="CSV" +--csvContainsHeaders="true" +--nonTokenizedDeadLetterPath="path-to-errors-rows-writing" +--batchSize=batch-size-number +--rpcUri=http://host:port/tokenize +``` + +By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify: + +```bash +--runner=YOUR_SELECTED_RUNNER +``` + +See the [documentation](http://beam.apache.org/get-started/quickstart/) and +the [Examples README](../../../../../../../../../README.md) for more information about how to run this example. + +## Running as a Dataflow Template + +This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See +this template documentation [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md) for +more information. diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/DataTokenizationOptions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/DataTokenizationOptions.java new file mode 100644 index 0000000000000..c68a5378e4878 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/DataTokenizationOptions.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.options; + +import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationBigTableIO; +import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO.FileSystemPipelineOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * The {@link DataTokenizationOptions} interface provides the custom execution options passed by the + * executor at the command-line. + */ +public interface DataTokenizationOptions + extends PipelineOptions, FileSystemPipelineOptions, TokenizationBigTableIO.BigTableOptions { + + @Description("Path to data schema (JSON format) compatible with BigQuery.") + String getDataSchemaPath(); + + void setDataSchemaPath(String dataSchemaPath); + + @Description( + "The Cloud Pub/Sub topic to read from." + + "The name should be in the format of " + + "projects//topics/.") + String getPubsubTopic(); + + void setPubsubTopic(String pubsubTopic); + + @Description("Cloud BigQuery table name to write into.") + String getBigQueryTableName(); + + void setBigQueryTableName(String bigQueryTableName); + + // Tokenization API specific parameters + @Description("URI for the API calls to RPC server.") + String getRpcUri(); + + void setRpcUri(String dsgUri); + + @Description("Size of the batch to send to RPC server per request.") + @Default.Integer(10) + Integer getBatchSize(); + + void setBatchSize(Integer batchSize); + + @Description("Dead-Letter path to store not-tokenized data") + String getNonTokenizedDeadLetterPath(); + + void setNonTokenizedDeadLetterPath(String nonTokenizedDeadLetterPath); +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/package-info.java new file mode 100644 index 0000000000000..b61fce1519103 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/options/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Protegrity Data Tokenization template for Google Cloud Teleport. */ +package org.apache.beam.examples.complete.datatokenization.options; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/package-info.java new file mode 100644 index 0000000000000..09d4df153c3bf --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Protegrity Data Tokenization template for Google Cloud Teleport. */ +package org.apache.beam.examples.complete.datatokenization; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java new file mode 100644 index 0000000000000..5ddf706d6f2da --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.transforms; + +import static org.apache.beam.sdk.util.RowJsonUtils.rowToJson; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.bigquery.model.TableRow; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.RowJson; +import org.apache.beam.sdk.util.RowJsonUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonArray; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonObject; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link DataProtectors} Using passing parameters transform will buffer input rows in batch and + * will send it when the count of buffered rows will equal specified batch size. When it takes the + * last one batch, it will send it when the last row will come to doFn even count of buffered rows + * will less than the batch size. + */ +public class DataProtectors { + + /** Logger for class. */ + private static final Logger LOG = LoggerFactory.getLogger(DataProtectors.class); + + public static final String ID_FIELD_NAME = "ID"; + private static final Long MAX_BUFFERING_DURATION_MS = + Long.valueOf(System.getenv().getOrDefault("MAX_BUFFERING_DURATION_MS", "100")); + + /** + * The {@link RowToTokenizedRow} transform converts {@link Row} to {@link TableRow} objects. The + * transform accepts a {@link FailsafeElement} object so the original payload of the incoming + * record can be maintained across multiple series of transforms. + */ + @AutoValue + public abstract static class RowToTokenizedRow + extends PTransform>, PCollectionTuple> { + + public static Builder newBuilder() { + return new AutoValue_DataProtectors_RowToTokenizedRow.Builder<>(); + } + + public abstract TupleTag successTag(); + + public abstract TupleTag> failureTag(); + + public abstract Schema schema(); + + public abstract int batchSize(); + + public abstract String rpcURI(); + + @Override + public PCollectionTuple expand(PCollection> inputRows) { + FailsafeElementCoder coder = + FailsafeElementCoder.of(RowCoder.of(schema()), RowCoder.of(schema())); + + Duration maxBuffering = Duration.millis(MAX_BUFFERING_DURATION_MS); + PCollectionTuple pCollectionTuple = + inputRows + .apply( + "GroupRowsIntoBatches", + GroupIntoBatches.ofSize(batchSize()) + .withMaxBufferingDuration(maxBuffering)) + .apply( + "Tokenize", + ParDo.of(new TokenizationFn(schema(), rpcURI(), failureTag())) + .withOutputTags(successTag(), TupleTagList.of(failureTag()))); + + return PCollectionTuple.of( + successTag(), pCollectionTuple.get(successTag()).setRowSchema(schema())) + .and(failureTag(), pCollectionTuple.get(failureTag()).setCoder(coder)); + } + + /** Builder for {@link RowToTokenizedRow}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setSuccessTag(TupleTag successTag); + + public abstract Builder setFailureTag(TupleTag> failureTag); + + public abstract Builder setSchema(Schema schema); + + public abstract Builder setBatchSize(int batchSize); + + public abstract Builder setRpcURI(String rpcURI); + + public abstract RowToTokenizedRow build(); + } + } + + /** Class implements stateful doFn for data tokenization using remote RPC. */ + @SuppressWarnings("initialization.static.fields.uninitialized") + public static class TokenizationFn extends DoFn>, Row> { + + private static Schema schemaToRpc; + private static CloseableHttpClient httpclient; + private static ObjectMapper objectMapperSerializerForSchema; + private static ObjectMapper objectMapperDeserializerForSchema; + + private final Schema schema; + private final String rpcURI; + private final TupleTag> failureTag; + + private Map inputRowsWithIds; + + public TokenizationFn( + Schema schema, String rpcURI, TupleTag> failureTag) { + this.schema = schema; + this.rpcURI = rpcURI; + this.failureTag = failureTag; + this.inputRowsWithIds = new HashMap<>(); + } + + @Setup + public void setup() { + + List fields = schema.getFields(); + fields.add(Field.of(ID_FIELD_NAME, FieldType.STRING)); + schemaToRpc = new Schema(fields); + + objectMapperSerializerForSchema = + RowJsonUtils.newObjectMapperWith(RowJson.RowJsonSerializer.forSchema(schemaToRpc)); + + objectMapperDeserializerForSchema = + RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schemaToRpc)); + + httpclient = HttpClients.createDefault(); + } + + @Teardown + public void close() { + try { + httpclient.close(); + } catch (IOException exception) { + String exceptionMessage = exception.getMessage(); + if (exceptionMessage != null) { + LOG.warn("Can't close connection: {}", exceptionMessage); + } + } + } + + @ProcessElement + @SuppressWarnings("argument.type.incompatible") + public void process(@Element KV> element, ProcessContext context) { + Iterable rows = element.getValue(); + + try { + for (Row outputRow : getTokenizedRow(rows)) { + context.output(outputRow); + } + } catch (Exception e) { + for (Row outputRow : rows) { + context.output( + failureTag, + FailsafeElement.of(outputRow, outputRow) + .setErrorMessage(e.getMessage()) + .setStacktrace(Throwables.getStackTraceAsString(e))); + } + } + } + + private ArrayList rowsToJsons(Iterable inputRows) { + ArrayList jsons = new ArrayList<>(); + Map inputRowsWithIds = new HashMap<>(); + for (Row inputRow : inputRows) { + + Row.Builder builder = Row.withSchema(schemaToRpc); + for (Schema.Field field : schemaToRpc.getFields()) { + if (inputRow.getSchema().hasField(field.getName())) { + builder = builder.addValue(inputRow.getValue(field.getName())); + } + } + String id = UUID.randomUUID().toString(); + builder = builder.addValue(id); + inputRowsWithIds.put(id, inputRow); + + Row row = builder.build(); + + jsons.add(rowToJson(objectMapperSerializerForSchema, row)); + } + this.inputRowsWithIds = inputRowsWithIds; + return jsons; + } + + private String formatJsonsToRpcBatch(Iterable jsons) { + StringBuilder stringBuilder = new StringBuilder(String.join(",", jsons)); + stringBuilder.append("]").insert(0, "{\"data\": [").append("}"); + return stringBuilder.toString(); + } + + @SuppressWarnings("argument.type.incompatible") + private ArrayList getTokenizedRow(Iterable inputRows) throws IOException { + ArrayList outputRows = new ArrayList<>(); + + CloseableHttpResponse response = + sendRpc(formatJsonsToRpcBatch(rowsToJsons(inputRows)).getBytes(Charset.defaultCharset())); + + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + LOG.error("Send to RPC '{}' failed with '{}'", this.rpcURI, response.getStatusLine()); + } + + String tokenizedData = + IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); + + Gson gson = new Gson(); + JsonArray jsonTokenizedRows = + gson.fromJson(tokenizedData, JsonObject.class).getAsJsonArray("data"); + + for (int i = 0; i < jsonTokenizedRows.size(); i++) { + Row tokenizedRow = + RowJsonUtils.jsonToRow( + objectMapperDeserializerForSchema, jsonTokenizedRows.get(i).toString()); + Row.FieldValueBuilder rowBuilder = + Row.fromRow(this.inputRowsWithIds.get(tokenizedRow.getString(ID_FIELD_NAME))); + for (Schema.Field field : schemaToRpc.getFields()) { + if (field.getName().equals(ID_FIELD_NAME)) { + continue; + } + rowBuilder = + rowBuilder.withFieldValue(field.getName(), tokenizedRow.getValue(field.getName())); + } + outputRows.add(rowBuilder.build()); + } + + return outputRows; + } + + private CloseableHttpResponse sendRpc(byte[] data) throws IOException { + HttpPost httpPost = new HttpPost(rpcURI); + HttpEntity stringEntity = new ByteArrayEntity(data, ContentType.APPLICATION_JSON); + httpPost.setEntity(stringEntity); + return httpclient.execute(httpPost); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/JsonToBeamRow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/JsonToBeamRow.java new file mode 100644 index 0000000000000..a6c87a368cf98 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/JsonToBeamRow.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.transforms; + +import static org.apache.beam.examples.complete.datatokenization.DataTokenization.FAILSAFE_ELEMENT_CODER; + +import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement; +import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils; +import org.apache.beam.sdk.transforms.JsonToRow; +import org.apache.beam.sdk.transforms.JsonToRow.ParseResult; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; + +/** The {@link JsonToBeamRow} converts jsons string to beam rows. */ +public class JsonToBeamRow extends PTransform, PCollection> { + + private final String failedToParseDeadLetterPath; + private final transient SchemasUtils schema; + + public JsonToBeamRow(String failedToParseDeadLetterPath, SchemasUtils schema) { + this.failedToParseDeadLetterPath = failedToParseDeadLetterPath; + this.schema = schema; + } + + @Override + @SuppressWarnings("argument.type.incompatible") + public PCollection expand(PCollection jsons) { + ParseResult rows = + jsons.apply( + "JsonToRow", + JsonToRow.withExceptionReporting(schema.getBeamSchema()).withExtendedErrorInfo()); + + if (failedToParseDeadLetterPath != null) { + /* + * Write Row conversion errors to filesystem specified path + */ + rows.getFailedToParseLines() + .apply( + "ToFailsafeElement", + MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor()) + .via( + (Row errRow) -> + FailsafeElement.of( + Strings.nullToEmpty(errRow.getString("line")), + Strings.nullToEmpty(errRow.getString("line"))) + .setErrorMessage(Strings.nullToEmpty(errRow.getString("err"))))) + .apply( + "WriteCsvConversionErrorsToFS", + ErrorConverters.WriteErrorsToTextIO.newBuilder() + .setErrorWritePath(failedToParseDeadLetterPath) + .setTranslateFunction(SerializableFunctions.getCsvErrorConverter()) + .build()); + } + + return rows.getResults().setRowSchema(schema.getBeamSchema()); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/SerializableFunctions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/SerializableFunctions.java new file mode 100644 index 0000000000000..1e5df0776e166 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/SerializableFunctions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.transforms; + +import java.time.Instant; +import java.util.ArrayList; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** The {@link SerializableFunctions} class to store static Serializable functions. */ +public class SerializableFunctions { + + private static final SerializableFunction, String> + csvErrorConverter = + (FailsafeElement failsafeElement) -> { + ArrayList outputRow = new ArrayList<>(); + final String message = failsafeElement.getOriginalPayload(); + String timestamp = Instant.now().toString(); + outputRow.add(timestamp); + outputRow.add(failsafeElement.getErrorMessage()); + outputRow.add(failsafeElement.getStacktrace()); + // Only set the payload if it's populated on the message. + if (failsafeElement.getOriginalPayload() != null) { + outputRow.add(message); + } + + return String.join(",", outputRow); + }; + + public static SerializableFunction, String> + getCsvErrorConverter() { + return csvErrorConverter; + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java new file mode 100644 index 0000000000000..fe8f4c1afad8b --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.transforms.io; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; +import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** The {@link TokenizationBigQueryIO} class for writing data from template to BigTable. */ +public class TokenizationBigQueryIO { + + /** Logger for class. */ + private static final Logger LOG = LoggerFactory.getLogger(TokenizationBigQueryIO.class); + + public static WriteResult write( + PCollection input, String bigQueryTableName, TableSchema schema) { + return input + .apply("RowToTableRow", ParDo.of(new RowToTableRowFn())) + .apply( + "WriteSuccessfulRecords", + org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.writeTableRows() + .withCreateDisposition( + org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition + .CREATE_IF_NEEDED) + .withWriteDisposition( + org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition + .WRITE_APPEND) + .withExtendedErrorInfo() + .withMethod( + org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.STREAMING_INSERTS) + .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()) + .withSchema(schema) + .to(bigQueryTableName)); + } + + /** + * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}. + * + * @param insertError BigQueryInsert error. + * @return FailsafeElement object. + */ + public static FailsafeElement wrapBigQueryInsertError( + BigQueryInsertError insertError) { + + FailsafeElement failsafeElement; + try { + + failsafeElement = + FailsafeElement.of( + insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString()); + failsafeElement.setErrorMessage(insertError.getError().toPrettyString()); + + } catch (IOException e) { + TokenizationBigQueryIO.LOG.error("Failed to wrap BigQuery insert error."); + throw new RuntimeException(e); + } + return failsafeElement; + } + + /** + * The {@link RowToTableRowFn} class converts a row to tableRow using {@link + * BigQueryUtils#toTableRow()}. + */ + public static class RowToTableRowFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext context) { + Row row = context.element(); + context.output(BigQueryUtils.toTableRow(row)); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java new file mode 100644 index 0000000000000..d7d1c3e972320 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.transforms.io; + +import com.google.bigtable.v2.Mutation; +import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteResult; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** The {@link TokenizationBigTableIO} class for writing data from template to BigTable. */ +public class TokenizationBigTableIO { + + /** Logger for class. */ + private static final Logger LOG = LoggerFactory.getLogger(TokenizationBigTableIO.class); + + private final DataTokenizationOptions options; + + public TokenizationBigTableIO(DataTokenizationOptions options) { + this.options = options; + } + + public PDone write(PCollection input, Schema schema) { + return input + .apply("ConvertToBigTableFormat", ParDo.of(new TransformToBigTableFormat(schema))) + .apply( + "WriteToBigTable", + BigtableIO.write() + .withProjectId(options.getBigTableProjectId()) + .withInstanceId(options.getBigTableInstanceId()) + .withTableId(options.getBigTableTableId()) + .withWriteResults()) + .apply("LogRowCount", new LogSuccessfulRows()); + } + + static class TransformToBigTableFormat extends DoFn>> { + + private final Schema schema; + + TransformToBigTableFormat(Schema schema) { + this.schema = schema; + } + + @ProcessElement + public void processElement( + @Element Row in, OutputReceiver>> out, ProcessContext c) { + DataTokenizationOptions options = c.getPipelineOptions().as(DataTokenizationOptions.class); + // Mapping every field in provided Row to Mutation.SetCell, which will create/update + // cell content with provided data + Set mutations = + schema.getFields().stream() + .map(Schema.Field::getName) + // Ignoring key field, otherwise it will be added as regular column + .filter(fieldName -> !Objects.equals(fieldName, options.getBigTableKeyColumnName())) + .map(fieldName -> Pair.of(fieldName, in.getString(fieldName))) + .map( + pair -> + Mutation.newBuilder() + .setSetCell( + Mutation.SetCell.newBuilder() + .setFamilyName(options.getBigTableColumnFamilyName()) + .setColumnQualifier( + ByteString.copyFrom(pair.getKey(), StandardCharsets.UTF_8)) + .setValue( + ByteString.copyFrom(pair.getValue(), StandardCharsets.UTF_8)) + .setTimestampMicros(System.currentTimeMillis() * 1000) + .build()) + .build()) + .collect(Collectors.toSet()); + // Converting key value to BigTable format + String columnName = in.getString(options.getBigTableKeyColumnName()); + if (columnName != null) { + ByteString key = ByteString.copyFrom(columnName, StandardCharsets.UTF_8); + out.output(KV.of(key, mutations)); + } + } + } + + static class LogSuccessfulRows extends PTransform, PDone> { + + @Override + public PDone expand(PCollection input) { + input.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element BigtableWriteResult in) { + LOG.info("Successfully wrote {} rows.", in.getRowsWritten()); + } + })); + return PDone.in(input.getPipeline()); + } + } + + /** + * Necessary {@link PipelineOptions} options for Pipelines that perform write operations to + * BigTable. + */ + public interface BigTableOptions extends PipelineOptions { + + @Description("Id of the project where the Cloud BigTable instance to write into is located.") + String getBigTableProjectId(); + + void setBigTableProjectId(String bigTableProjectId); + + @Description("Id of the Cloud BigTable instance to write into.") + String getBigTableInstanceId(); + + void setBigTableInstanceId(String bigTableInstanceId); + + @Description("Id of the Cloud BigTable table to write into.") + String getBigTableTableId(); + + void setBigTableTableId(String bigTableTableId); + + @Description("Column name to use as a key in Cloud BigTable.") + String getBigTableKeyColumnName(); + + void setBigTableKeyColumnName(String bigTableKeyColumnName); + + @Description("Column family name to use in Cloud BigTable.") + String getBigTableColumnFamilyName(); + + void setBigTableColumnFamilyName(String bigTableColumnFamilyName); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationFileSystemIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationFileSystemIO.java new file mode 100644 index 0000000000000..54805e11e2d5d --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationFileSystemIO.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.transforms.io; + +import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions; +import org.apache.beam.examples.complete.datatokenization.transforms.JsonToBeamRow; +import org.apache.beam.examples.complete.datatokenization.transforms.SerializableFunctions; +import org.apache.beam.examples.complete.datatokenization.utils.CsvConverters; +import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement; +import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv; +import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ToJson; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptors; + +/** The {@link TokenizationFileSystemIO} class to read/write data from/into File Systems. */ +public class TokenizationFileSystemIO { + + /** The tag for the headers of the CSV if required. */ + static final TupleTag CSV_HEADERS = new TupleTag() {}; + + /** The tag for the lines of the CSV. */ + static final TupleTag CSV_LINES = new TupleTag() {}; + + /** The tag for the dead-letter output. */ + static final TupleTag> PROCESSING_DEADLETTER_OUT = + new TupleTag>() {}; + + /** The tag for the main output. */ + static final TupleTag> PROCESSING_OUT = + new TupleTag>() {}; + + /** Supported format to read from GCS. */ + public enum FORMAT { + JSON, + CSV + } + + /** + * Necessary {@link PipelineOptions} options for Pipelines that operate with JSON/CSV data in FS. + */ + public interface FileSystemPipelineOptions extends PipelineOptions { + + @Description("Filepattern for files to read data from") + String getInputFilePattern(); + + void setInputFilePattern(String inputFilePattern); + + @Description("File format of input files. Supported formats: JSON, CSV") + @Default.Enum("JSON") + TokenizationFileSystemIO.FORMAT getInputFileFormat(); + + void setInputFileFormat(FORMAT inputFileFormat); + + @Description("Directory to write data to") + String getOutputDirectory(); + + void setOutputDirectory(String outputDirectory); + + @Description("File format of output files. Supported formats: JSON, CSV") + @Default.Enum("JSON") + TokenizationFileSystemIO.FORMAT getOutputFileFormat(); + + void setOutputFileFormat(FORMAT outputFileFormat); + + @Description( + "The window duration in which data will be written. " + + "Should be specified only for 'Pub/Sub -> FS' case. Defaults to 30s. " + + "Allowed formats are: " + + "Ns (for seconds, example: 5s), " + + "Nm (for minutes, example: 12m), " + + "Nh (for hours, example: 2h).") + @Default.String("30s") + String getWindowDuration(); + + void setWindowDuration(String windowDuration); + + // CSV parameters + @Description("If file(s) contain headers") + Boolean getCsvContainsHeaders(); + + void setCsvContainsHeaders(Boolean csvContainsHeaders); + + @Description("Delimiting character in CSV. Default: use delimiter provided in csvFormat") + @Default.InstanceFactory(CsvConverters.DelimiterFactory.class) + String getCsvDelimiter(); + + void setCsvDelimiter(String csvDelimiter); + + @Description( + "Csv format according to Apache Commons CSV format. Default is: Apache Commons CSV" + + " default\n" + + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT\n" + + "Must match format names exactly found at: " + + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html") + @Default.String("Default") + String getCsvFormat(); + + void setCsvFormat(String csvFormat); + } + + private final DataTokenizationOptions options; + + public TokenizationFileSystemIO(DataTokenizationOptions options) { + this.options = options; + } + + public PCollection read(Pipeline pipeline, SchemasUtils schema) { + switch (options.getInputFileFormat()) { + case JSON: + return readJson(pipeline) + .apply(new JsonToBeamRow(options.getNonTokenizedDeadLetterPath(), schema)); + case CSV: + return readCsv(pipeline, schema) + .apply(new JsonToBeamRow(options.getNonTokenizedDeadLetterPath(), schema)); + default: + throw new IllegalStateException( + "No valid format for input data is provided. Please, choose JSON or CSV."); + } + } + + private PCollection readJson(Pipeline pipeline) { + return pipeline.apply("ReadJsonFromFiles", TextIO.read().from(options.getInputFilePattern())); + } + + private PCollection readCsv(Pipeline pipeline, SchemasUtils schema) { + /* + * Step 1: Read CSV file(s) from File System using {@link CsvConverters.ReadCsv}. + */ + PCollectionTuple csvLines = readCsv(pipeline); + /* + * Step 2: Convert lines to Json. + */ + PCollectionTuple jsons = csvLineToJson(csvLines, schema.getJsonBeamSchema()); + + if (options.getNonTokenizedDeadLetterPath() != null) { + /* + * Step 3: Write jsons to dead-letter that weren't successfully processed. + */ + jsons + .get(PROCESSING_DEADLETTER_OUT) + .apply( + "WriteCsvConversionErrorsToFS", + ErrorConverters.WriteErrorsToTextIO.newBuilder() + .setErrorWritePath(options.getNonTokenizedDeadLetterPath()) + .setTranslateFunction(SerializableFunctions.getCsvErrorConverter()) + .build()); + } + + /* + * Step 4: Get jsons that were successfully processed. + */ + return jsons + .get(PROCESSING_OUT) + .apply( + "GetJson", + MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload)); + } + + private PCollectionTuple readCsv(Pipeline pipeline) { + return pipeline.apply( + "ReadCsvFromFiles", + CsvConverters.ReadCsv.newBuilder() + .setCsvFormat(options.getCsvFormat()) + .setDelimiter(options.getCsvDelimiter()) + .setHasHeaders(options.getCsvContainsHeaders()) + .setInputFileSpec(options.getInputFilePattern()) + .setHeaderTag(CSV_HEADERS) + .setLineTag(CSV_LINES) + .build()); + } + + private PCollectionTuple csvLineToJson(PCollectionTuple csvLines, String jsonSchema) { + return csvLines.apply( + "LineToJson", + CsvConverters.LineToFailsafeJson.newBuilder() + .setDelimiter(options.getCsvDelimiter()) + .setJsonSchema(jsonSchema) + .setHeaderTag(CSV_HEADERS) + .setLineTag(CSV_LINES) + .setUdfOutputTag(PROCESSING_OUT) + .setUdfDeadletterTag(PROCESSING_DEADLETTER_OUT) + .build()); + } + + public PDone write(PCollection input, Schema schema) { + switch (options.getOutputFileFormat()) { + case JSON: + return writeJson(input); + case CSV: + return writeCsv(input, schema); + default: + throw new IllegalStateException( + "No valid format for output data is provided. Please, choose JSON or CSV."); + } + } + + private PDone writeJson(PCollection input) { + PCollection jsons = input.apply("RowsToJSON", ToJson.of()); + + if (jsons.isBounded() == IsBounded.BOUNDED) { + return jsons.apply("WriteToFS", TextIO.write().to(options.getOutputDirectory())); + } else { + return jsons.apply( + "WriteToFS", + TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutputDirectory())); + } + } + + private PDone writeCsv(PCollection input, Schema schema) { + String header = String.join(options.getCsvDelimiter(), schema.getFieldNames()); + String csvDelimiter = options.getCsvDelimiter(); + + PCollection csvs = + input.apply( + "ConvertToCSV", + MapElements.into(TypeDescriptors.strings()) + .via((Row inputRow) -> new RowToCsv(csvDelimiter).getCsvFromRow(inputRow))); + + if (csvs.isBounded() == IsBounded.BOUNDED) { + return csvs.apply( + "WriteToFS", TextIO.write().to(options.getOutputDirectory()).withHeader(header)); + } else { + return csvs.apply( + "WriteToFS", + TextIO.write() + .withWindowedWrites() + .withNumShards(1) + .to(options.getOutputDirectory()) + .withHeader(header)); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/package-info.java new file mode 100644 index 0000000000000..0c1c03c0295c1 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Protegrity Data Tokenization template for Google Cloud Teleport. */ +package org.apache.beam.examples.complete.datatokenization.transforms.io; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/package-info.java new file mode 100644 index 0000000000000..5530e633d727a --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Protegrity Data Tokenization template for Google Cloud Teleport. */ +package org.apache.beam.examples.complete.datatokenization.transforms; diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java new file mode 100644 index 0000000000000..090abcb80eb50 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.utils; + +import static org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils.getGcsFileAsString; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.stream.JsonWriter; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sample; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Common transforms for Csv files. */ +@SuppressWarnings({"argument.type.incompatible"}) +public class CsvConverters { + + /* Logger for class. */ + private static final Logger LOG = LoggerFactory.getLogger(CsvConverters.class); + + private static final String SUCCESSFUL_TO_JSON_COUNTER = "SuccessfulToJsonCounter"; + + private static final String FAILED_TO_JSON_COUNTER = "FailedToJsonCounter"; + + private static JsonParser jsonParser = new JsonParser(); + + /** + * Builds Json string from list of values and headers or values and schema if schema is provided. + * + * @param headers optional list of strings which is the header of the Csv file. + * @param values list of strings which are combined with header or json schema to create Json + * string. + * @param jsonSchemaString + * @return Json string containing object. + * @throws IOException thrown if Json object is not able to be written. + * @throws NumberFormatException thrown if value cannot be parsed into type successfully. + */ + static String buildJsonString( + @Nullable List headers, List values, @Nullable String jsonSchemaString) + throws Exception { + + StringWriter stringWriter = new StringWriter(); + JsonWriter writer = new JsonWriter(stringWriter); + + if (jsonSchemaString != null) { + JsonArray jsonSchema = jsonParser.parse(jsonSchemaString).getAsJsonArray(); + writer.beginObject(); + + for (int i = 0; i < jsonSchema.size(); i++) { + JsonObject jsonObject = jsonSchema.get(i).getAsJsonObject(); + String type = jsonObject.get("type").getAsString().toUpperCase(); + writer.name(jsonObject.get("name").getAsString()); + + switch (type) { + case "LONG": + writer.value(Long.parseLong(values.get(i))); + break; + + case "DOUBLE": + writer.value(Double.parseDouble(values.get(i))); + break; + + case "INTEGER": + writer.value(Integer.parseInt(values.get(i))); + break; + + case "SHORT": + writer.value(Short.parseShort(values.get(i))); + break; + + case "BYTE": + writer.value(Byte.parseByte(values.get(i))); + break; + + case "FLOAT": + writer.value(Float.parseFloat(values.get(i))); + break; + + case "TEXT": + case "KEYWORD": + case "STRING": + writer.value(values.get(i)); + break; + + default: + LOG.error("Invalid data type, got: " + type); + throw new RuntimeException("Invalid data type, got: " + type); + } + } + writer.endObject(); + writer.close(); + return stringWriter.toString(); + + } else if (headers != null) { + + writer.beginObject(); + + for (int i = 0; i < headers.size(); i++) { + writer.name(headers.get(i)); + writer.value(values.get(i)); + } + + writer.endObject(); + writer.close(); + return stringWriter.toString(); + + } else { + LOG.error("No headers or schema specified"); + throw new RuntimeException("No headers or schema specified"); + } + } + + /** + * Gets Csv format accoring to Apache Commons CSV. If user + * passed invalid format error is thrown. + */ + public static CSVFormat getCsvFormat(String formatString, @Nullable String delimiter) { + + CSVFormat format = CSVFormat.Predefined.valueOf(formatString).getFormat(); + + // If a delimiter has been passed set it here. + if (delimiter != null) { + return format.withDelimiter(delimiter.charAt(0)); + } + return format; + } + + /** Necessary {@link PipelineOptions} options for Csv Pipelines. */ + public interface CsvPipelineOptions extends PipelineOptions { + @Description("Pattern to where data lives, ex: gs://mybucket/somepath/*.csv") + String getInputFileSpec(); + + void setInputFileSpec(String inputFileSpec); + + @Description("If file(s) contain headers") + Boolean getContainsHeaders(); + + void setContainsHeaders(Boolean containsHeaders); + + @Description("Deadletter table for failed inserts in form: :.") + String getDeadletterTable(); + + void setDeadletterTable(String deadletterTable); + + @Description("Delimiting character. Default: use delimiter provided in csvFormat") + @Default.InstanceFactory(DelimiterFactory.class) + String getDelimiter(); + + void setDelimiter(String delimiter); + + @Description( + "Csv format according to Apache Commons CSV format. Default is: Apache Commons CSV" + + " default\n" + + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT\n" + + "Must match format names exactly found at: " + + "https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html") + @Default.String("Default") + String getCsvFormat(); + + void setCsvFormat(String csvFormat); + + @Description("Optional: Path to JSON schema, ex gs://path/to/schema. ") + String getJsonSchemaPath(); + + void setJsonSchemaPath(String jsonSchemaPath); + + @Description("Set to true if number of files is in the tens of thousands. Default: false") + @Default.Boolean(false) + Boolean getLargeNumFiles(); + + void setLargeNumFiles(Boolean largeNumFiles); + } + + /** + * Default value factory to get delimiter from Csv format so that if the user does not pass one + * in, it matches the supplied {@link CsvPipelineOptions#getCsvFormat()}. + */ + public static class DelimiterFactory implements DefaultValueFactory { + + @Override + public String create(PipelineOptions options) { + CSVFormat csvFormat = getCsvFormat(options.as(CsvPipelineOptions.class).getCsvFormat(), null); + return String.valueOf(csvFormat.getDelimiter()); + } + } + + /** + * The {@link LineToFailsafeJson} interface converts a line from a Csv file into a Json string. + * Uses either: Javascript Udf, Json schema or the headers of the file to create the Json object + * which is then added to the {@link FailsafeElement} as the new payload. + */ + @AutoValue + public abstract static class LineToFailsafeJson + extends PTransform { + + public static Builder newBuilder() { + return new AutoValue_CsvConverters_LineToFailsafeJson.Builder(); + } + + public abstract String delimiter(); + + @Nullable + public abstract String jsonSchemaPath(); + + @Nullable + public abstract String jsonSchema(); + + public abstract TupleTag headerTag(); + + public abstract TupleTag lineTag(); + + public abstract TupleTag> udfOutputTag(); + + public abstract TupleTag> udfDeadletterTag(); + + @Override + public PCollectionTuple expand(PCollectionTuple lines) { + + PCollectionView headersView = null; + + // Convert csv lines into Failsafe elements so that we can recover over multiple transforms. + PCollection> lineFailsafeElements = + lines + .get(lineTag()) + .apply("LineToFailsafeElement", ParDo.of(new LineToFailsafeElementFn())); + + // If no udf then use json schema + String schemaPath = jsonSchemaPath(); + if (schemaPath != null || jsonSchema() != null) { + + String schema; + if (schemaPath != null) { + schema = getGcsFileAsString(schemaPath); + } else { + schema = jsonSchema(); + } + + return lineFailsafeElements.apply( + "LineToDocumentUsingSchema", + ParDo.of( + new FailsafeElementToJsonFn( + headersView, schema, delimiter(), udfDeadletterTag())) + .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag()))); + } + + // Run if using headers + headersView = lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton()); + + PCollectionView finalHeadersView = headersView; + lines + .get(headerTag()) + .apply( + "CheckHeaderConsistency", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + String headers = c.sideInput(finalHeadersView); + if (!c.element().equals(headers)) { + LOG.error("Headers do not match, consistency cannot be guaranteed"); + throw new RuntimeException( + "Headers do not match, consistency cannot be guaranteed"); + } + } + }) + .withSideInputs(finalHeadersView)); + + return lineFailsafeElements.apply( + "LineToDocumentWithHeaders", + ParDo.of( + new FailsafeElementToJsonFn( + headersView, jsonSchemaPath(), delimiter(), udfDeadletterTag())) + .withSideInputs(headersView) + .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag()))); + } + + /** Builder for {@link LineToFailsafeJson}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setDelimiter(String delimiter); + + public abstract Builder setJsonSchemaPath(String jsonSchemaPath); + + public abstract Builder setJsonSchema(String jsonSchema); + + public abstract Builder setHeaderTag(TupleTag headerTag); + + public abstract Builder setLineTag(TupleTag lineTag); + + public abstract Builder setUdfOutputTag( + TupleTag> udfOutputTag); + + public abstract Builder setUdfDeadletterTag( + TupleTag> udfDeadletterTag); + + public abstract LineToFailsafeJson build(); + } + } + + /** + * The {@link FailsafeElementToJsonFn} class creates a Json string from a failsafe element. + * + *

{@link FailsafeElementToJsonFn#FailsafeElementToJsonFn(PCollectionView, String, String, + * TupleTag)} + */ + public static class FailsafeElementToJsonFn + extends DoFn, FailsafeElement> { + + @Nullable public final String jsonSchema; + public final String delimiter; + public final TupleTag> udfDeadletterTag; + @Nullable private final PCollectionView headersView; + private Counter successCounter = + Metrics.counter(FailsafeElementToJsonFn.class, SUCCESSFUL_TO_JSON_COUNTER); + private Counter failedCounter = + Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER); + + FailsafeElementToJsonFn( + PCollectionView headersView, + String jsonSchema, + String delimiter, + TupleTag> udfDeadletterTag) { + this.headersView = headersView; + this.jsonSchema = jsonSchema; + this.delimiter = delimiter; + this.udfDeadletterTag = udfDeadletterTag; + } + + @ProcessElement + public void processElement(ProcessContext context) { + FailsafeElement element = context.element(); + List header = null; + + if (this.headersView != null) { + header = Arrays.asList(context.sideInput(this.headersView).split(this.delimiter)); + } + + List record = Arrays.asList(element.getOriginalPayload().split(this.delimiter)); + + try { + String json = buildJsonString(header, record, this.jsonSchema); + context.output(FailsafeElement.of(element.getOriginalPayload(), json)); + successCounter.inc(); + } catch (Exception e) { + failedCounter.inc(); + context.output( + this.udfDeadletterTag, + FailsafeElement.of(element) + .setErrorMessage(e.getMessage()) + .setStacktrace(Throwables.getStackTraceAsString(e))); + } + } + } + + /** + * The {@link LineToFailsafeElementFn} wraps an csv line with the {@link FailsafeElement} class so + * errors can be recovered from and the original message can be output to a error records table. + */ + static class LineToFailsafeElementFn extends DoFn> { + + @ProcessElement + public void processElement(ProcessContext context) { + String message = context.element(); + context.output(FailsafeElement.of(message, message)); + } + } + + /** + * The {@link ReadCsv} class is a {@link PTransform} that reads from one for more Csv files. The + * transform returns a {@link PCollectionTuple} consisting of the following {@link PCollection}: + * + *

    + *
  • {@link ReadCsv#headerTag()} - Contains headers found in files if read with headers, + * contains empty {@link PCollection} if no headers. + *
  • {@link ReadCsv#lineTag()} - Contains Csv lines as a {@link PCollection} of strings. + *
+ */ + @AutoValue + public abstract static class ReadCsv extends PTransform { + + public static Builder newBuilder() { + return new AutoValue_CsvConverters_ReadCsv.Builder(); + } + + public abstract String csvFormat(); + + @Nullable + public abstract String delimiter(); + + public abstract Boolean hasHeaders(); + + public abstract String inputFileSpec(); + + public abstract TupleTag headerTag(); + + public abstract TupleTag lineTag(); + + @Override + public PCollectionTuple expand(PBegin input) { + + if (hasHeaders()) { + return input + .apply("MatchFilePattern", FileIO.match().filepattern(inputFileSpec())) + .apply("ReadMatches", FileIO.readMatches()) + .apply( + "ReadCsvWithHeaders", + ParDo.of(new GetCsvHeadersFn(headerTag(), lineTag(), csvFormat(), delimiter())) + .withOutputTags(headerTag(), TupleTagList.of(lineTag()))); + } + + return PCollectionTuple.of( + lineTag(), input.apply("ReadCsvWithoutHeaders", TextIO.read().from(inputFileSpec()))); + } + + /** Builder for {@link ReadCsv}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setCsvFormat(String csvFormat); + + public abstract Builder setDelimiter(@Nullable String delimiter); + + public abstract Builder setHasHeaders(Boolean hasHeaders); + + public abstract Builder setInputFileSpec(String inputFileSpec); + + public abstract Builder setHeaderTag(TupleTag headerTag); + + public abstract Builder setLineTag(TupleTag lineTag); + + abstract ReadCsv autoBuild(); + + public ReadCsv build() { + + ReadCsv readCsv = autoBuild(); + + checkArgument(readCsv.inputFileSpec() != null, "Input file spec must be provided."); + + checkArgument(readCsv.csvFormat() != null, "Csv format must not be null."); + + checkArgument(readCsv.hasHeaders() != null, "Header information must be provided."); + + return readCsv; + } + } + } + + /** + * The {@link GetCsvHeadersFn} class gets the header of a Csv file and outputs it as a string. The + * csv format provided in {@link CsvConverters#getCsvFormat(String, String)} is used to get the + * header. + */ + static class GetCsvHeadersFn extends DoFn { + + private final TupleTag headerTag; + private final TupleTag linesTag; + private CSVFormat csvFormat; + + GetCsvHeadersFn( + TupleTag headerTag, TupleTag linesTag, String csvFormat, String delimiter) { + this.headerTag = headerTag; + this.linesTag = linesTag; + this.csvFormat = getCsvFormat(csvFormat, delimiter); + } + + @ProcessElement + public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) { + ReadableFile f = context.element(); + String headers; + List records = null; + String delimiter = String.valueOf(this.csvFormat.getDelimiter()); + try { + String csvFileString = f.readFullyAsUTF8String(); + StringReader reader = new StringReader(csvFileString); + CSVParser parser = CSVParser.parse(reader, this.csvFormat.withFirstRecordAsHeader()); + records = + parser.getRecords().stream() + .map(i -> String.join(delimiter, i)) + .collect(Collectors.toList()); + headers = String.join(delimiter, parser.getHeaderNames()); + } catch (IOException ioe) { + LOG.error("Headers do not match, consistency cannot be guaranteed"); + throw new RuntimeException("Could not read Csv headers: " + ioe.getMessage()); + } + outputReceiver.get(this.headerTag).output(headers); + records.forEach(r -> outputReceiver.get(this.linesTag).output(r)); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java new file mode 100644 index 0000000000000..83d3aea3acb5e --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.utils; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.util.Locale; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.MutablePeriod; +import org.joda.time.format.PeriodFormatterBuilder; +import org.joda.time.format.PeriodParser; + +/** + * The {@link DurationUtils} class provides common utilities for manipulating and formatting {@link + * Duration} objects. + */ +public class DurationUtils { + + /** + * Parses a duration from a period formatted string. Values are accepted in the following formats: + * + *

Formats Ns - Seconds. Example: 5s
+ * Nm - Minutes. Example: 13m
+ * Nh - Hours. Example: 2h + * + *

+   * parseDuration(null) = NullPointerException()
+   * parseDuration("")   = Duration.standardSeconds(0)
+   * parseDuration("2s") = Duration.standardSeconds(2)
+   * parseDuration("5m") = Duration.standardMinutes(5)
+   * parseDuration("3h") = Duration.standardHours(3)
+   * 
+ * + * @param value The period value to parse. + * @return The {@link Duration} parsed from the supplied period string. + */ + public static Duration parseDuration(String value) { + checkNotNull(value, "The specified duration must be a non-null value!"); + + PeriodParser parser = + new PeriodFormatterBuilder() + .appendSeconds() + .appendSuffix("s") + .appendMinutes() + .appendSuffix("m") + .appendHours() + .appendSuffix("h") + .toParser(); + + MutablePeriod period = new MutablePeriod(); + parser.parseInto(period, value, 0, Locale.getDefault()); + + Duration duration = period.toDurationFrom(new DateTime(0)); + checkArgument(duration.getMillis() > 0, "The window duration must be greater than 0!"); + + return duration; + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java new file mode 100644 index 0000000000000..04beac038926c --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.utils; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.auto.value.AutoValue; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** Transforms & DoFns & Options for Teleport Error logging. */ +public class ErrorConverters { + + /** Writes all Errors to GCS, place at the end of your pipeline. */ + @AutoValue + public abstract static class WriteStringMessageErrorsAsCsv + extends PTransform>, PDone> { + + public static Builder newBuilder() { + return new AutoValue_ErrorConverters_WriteStringMessageErrorsAsCsv.Builder(); + } + + public abstract String errorWritePath(); + + public abstract String csvDelimiter(); + + @Nullable + public abstract Duration windowDuration(); + + @SuppressWarnings("argument.type.incompatible") + @Override + public PDone expand(PCollection> pCollection) { + + PCollection formattedErrorRows = + pCollection.apply( + "GetFormattedErrorRow", ParDo.of(new FailedStringToCsvRowFn(csvDelimiter()))); + + if (pCollection.isBounded() == IsBounded.UNBOUNDED) { + if (windowDuration() != null) { + formattedErrorRows = + formattedErrorRows.apply(Window.into(FixedWindows.of(windowDuration()))); + } + return formattedErrorRows.apply( + TextIO.write().to(errorWritePath()).withNumShards(1).withWindowedWrites()); + + } else { + return formattedErrorRows.apply(TextIO.write().to(errorWritePath()).withNumShards(1)); + } + } + + /** Builder for {@link WriteStringMessageErrorsAsCsv}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setErrorWritePath(String errorWritePath); + + public abstract Builder setCsvDelimiter(String csvDelimiter); + + public abstract Builder setWindowDuration(@Nullable Duration duration); + + public abstract WriteStringMessageErrorsAsCsv build(); + } + } + + /** + * The {@link FailedStringToCsvRowFn} converts string objects which have failed processing into + * {@link String} objects contained CSV which can be output to a filesystem. + */ + public static class FailedStringToCsvRowFn extends DoFn, String> { + + /** + * The formatter used to convert timestamps into a BigQuery compatible format. + */ + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + private final String csvDelimiter; + + public FailedStringToCsvRowFn(String csvDelimiter) { + this.csvDelimiter = csvDelimiter; + } + + public FailedStringToCsvRowFn() { + this.csvDelimiter = ","; + } + + @ProcessElement + public void processElement(ProcessContext context) { + FailsafeElement failsafeElement = context.element(); + ArrayList outputRow = new ArrayList<>(); + final String message = failsafeElement.getOriginalPayload(); + + // Format the timestamp for insertion + String timestamp = + TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC)); + + outputRow.add(timestamp); + outputRow.add(failsafeElement.getErrorMessage()); + + // Only set the payload if it's populated on the message. + if (message != null) { + outputRow.add(message); + } + + context.output(String.join(csvDelimiter, outputRow)); + } + } + + /** Write errors as string encoded messages. */ + @AutoValue + public abstract static class WriteStringMessageErrors + extends PTransform>, WriteResult> { + + public static Builder newBuilder() { + return new AutoValue_ErrorConverters_WriteStringMessageErrors.Builder(); + } + + public abstract String getErrorRecordsTable(); + + public abstract String getErrorRecordsTableSchema(); + + @Override + public WriteResult expand(PCollection> failedRecords) { + + return failedRecords + .apply("FailedRecordToTableRow", ParDo.of(new FailedStringToTableRowFn())) + .apply( + "WriteFailedRecordsToBigQuery", + BigQueryIO.writeTableRows() + .to(getErrorRecordsTable()) + .withJsonSchema(getErrorRecordsTableSchema()) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_APPEND)); + } + + /** Builder for {@link WriteStringMessageErrors}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setErrorRecordsTable(String errorRecordsTable); + + public abstract Builder setErrorRecordsTableSchema(String errorRecordsTableSchema); + + public abstract WriteStringMessageErrors build(); + } + } + + /** + * The {@link FailedStringToTableRowFn} converts string objects which have failed processing into + * {@link TableRow} objects which can be output to a dead-letter table. + */ + public static class FailedStringToTableRowFn + extends DoFn, TableRow> { + + /** + * The formatter used to convert timestamps into a BigQuery compatible format. + */ + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); + + @ProcessElement + public void processElement(ProcessContext context) { + FailsafeElement failsafeElement = context.element(); + final String message = failsafeElement.getOriginalPayload(); + + // Format the timestamp for insertion + String timestamp = + TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC)); + + // Build the table row + final TableRow failedRow = + new TableRow() + .set("timestamp", timestamp) + .set("errorMessage", failsafeElement.getErrorMessage()) + .set("stacktrace", failsafeElement.getStacktrace()); + + // Only set the payload if it's populated on the message. + if (message != null) { + failedRow + .set("payloadString", message) + .set("payloadBytes", message.getBytes(StandardCharsets.UTF_8)); + } + + context.output(failedRow); + } + } + + /** + * {@link WriteErrorsToTextIO} is a {@link PTransform} that writes strings error messages to file + * system using TextIO and custom line format {@link SerializableFunction} to convert errors in + * necessary format.
+ * Example of usage in pipeline: + * + *
{@code
+   * pCollection.apply("Write to TextIO",
+   *   WriteErrorsToTextIO.newBuilder()
+   *     .setErrorWritePath("errors.txt")
+   *     .setTranslateFunction((FailsafeElement failsafeElement) -> {
+   *       ArrayList outputRow  = new ArrayList<>();
+   *       final String message = failsafeElement.getOriginalPayload();
+   *       String timestamp = Instant.now().toString();
+   *       outputRow.add(timestamp);
+   *       outputRow.add(failsafeElement.getErrorMessage());
+   *       outputRow.add(failsafeElement.getStacktrace());
+   *       // Only set the payload if it's populated on the message.
+   *       if (failsafeElement.getOriginalPayload() != null) {
+   *         outputRow.add(message);
+   *       }
+   *
+   *       return String.join(",",outputRow);
+   *     })
+   * }
+ */ + @AutoValue + public abstract static class WriteErrorsToTextIO + extends PTransform>, PDone> { + + public static WriteErrorsToTextIO.Builder newBuilder() { + return new AutoValue_ErrorConverters_WriteErrorsToTextIO.Builder<>(); + } + + public abstract String errorWritePath(); + + public abstract SerializableFunction, String> translateFunction(); + + @Nullable + public abstract Duration windowDuration(); + + @Override + @SuppressWarnings("argument.type.incompatible") + public PDone expand(PCollection> pCollection) { + + PCollection formattedErrorRows = + pCollection.apply( + "GetFormattedErrorRow", + MapElements.into(TypeDescriptors.strings()).via(translateFunction())); + + if (pCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) { + if (windowDuration() == null) { + throw new RuntimeException("Unbounded input requires window interval to be set"); + } + return formattedErrorRows + .apply(Window.into(FixedWindows.of(windowDuration()))) + .apply(TextIO.write().to(errorWritePath()).withNumShards(1).withWindowedWrites()); + } + + return formattedErrorRows.apply(TextIO.write().to(errorWritePath()).withNumShards(1)); + } + + /** Builder for {@link WriteErrorsToTextIO}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract WriteErrorsToTextIO.Builder setErrorWritePath(String errorWritePath); + + public abstract WriteErrorsToTextIO.Builder setTranslateFunction( + SerializableFunction, String> translateFunction); + + public abstract WriteErrorsToTextIO.Builder setWindowDuration( + @Nullable Duration duration); + + abstract SerializableFunction, String> translateFunction(); + + abstract WriteErrorsToTextIO autoBuild(); + + public WriteErrorsToTextIO build() { + checkNotNull(translateFunction(), "translateFunction is required."); + return autoBuild(); + } + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElement.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElement.java new file mode 100644 index 0000000000000..66f1c3c176a8f --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElement.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.utils; + +import java.util.Objects; +import org.apache.avro.reflect.Nullable; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; + +/** + * The {@link FailsafeElement} class holds the current value and original value of a record within a + * pipeline. This class allows pipelines to not lose valuable information about an incoming record + * throughout the processing of that record. The use of this class allows for more robust + * dead-letter strategies as the original record information is not lost throughout the pipeline and + * can be output to a dead-letter in the event of a failure during one of the pipelines transforms. + */ +@DefaultCoder(FailsafeElementCoder.class) +public class FailsafeElement { + + private final OriginalT originalPayload; + private final CurrentT payload; + @Nullable private String errorMessage = ""; + @Nullable private String stacktrace = ""; + + private FailsafeElement(OriginalT originalPayload, CurrentT payload) { + this.originalPayload = originalPayload; + this.payload = payload; + } + + public static FailsafeElement of( + OriginalT originalPayload, CurrentT currentPayload) { + return new FailsafeElement<>(originalPayload, currentPayload); + } + + public static FailsafeElement of( + FailsafeElement other) { + return new FailsafeElement<>(other.originalPayload, other.payload) + .setErrorMessage(other.getErrorMessage()) + .setStacktrace(other.getStacktrace()); + } + + public OriginalT getOriginalPayload() { + return originalPayload; + } + + public CurrentT getPayload() { + return payload; + } + + public String getErrorMessage() { + return errorMessage; + } + + public FailsafeElement setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } + + public String getStacktrace() { + return stacktrace; + } + + public FailsafeElement setStacktrace(String stacktrace) { + this.stacktrace = stacktrace; + return this; + } + + @Override + public int hashCode() { + return Objects.hash(originalPayload, payload, errorMessage, stacktrace); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("originalPayload", originalPayload) + .add("payload", payload) + .add("errorMessage", errorMessage) + .add("stacktrace", stacktrace) + .toString(); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElementCoder.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElementCoder.java new file mode 100644 index 0000000000000..151d98a9070ec --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/FailsafeElementCoder.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeParameter; + +/** + * The {@link FailsafeElementCoder} encodes and decodes {@link FailsafeElement} objects. + * + *

This coder is necessary until Avro supports parameterized types (AVRO-1571) without requiring to + * explicitly specifying the schema for the type. + * + * @param The type of the original payload to be encoded. + * @param The type of the current payload to be encoded. + */ +public class FailsafeElementCoder + extends CustomCoder> { + + private static final NullableCoder STRING_CODER = NullableCoder.of(StringUtf8Coder.of()); + private final Coder originalPayloadCoder; + private final Coder currentPayloadCoder; + + private FailsafeElementCoder( + Coder originalPayloadCoder, Coder currentPayloadCoder) { + this.originalPayloadCoder = originalPayloadCoder; + this.currentPayloadCoder = currentPayloadCoder; + } + + public Coder getOriginalPayloadCoder() { + return originalPayloadCoder; + } + + public Coder getCurrentPayloadCoder() { + return currentPayloadCoder; + } + + public static FailsafeElementCoder of( + Coder originalPayloadCoder, Coder currentPayloadCoder) { + return new FailsafeElementCoder<>(originalPayloadCoder, currentPayloadCoder); + } + + @Override + public void encode(FailsafeElement value, OutputStream outStream) + throws IOException { + if (value == null) { + throw new CoderException("The FailsafeElementCoder cannot encode a null object!"); + } + + originalPayloadCoder.encode(value.getOriginalPayload(), outStream); + currentPayloadCoder.encode(value.getPayload(), outStream); + STRING_CODER.encode(value.getErrorMessage(), outStream); + STRING_CODER.encode(value.getStacktrace(), outStream); + } + + @Override + public FailsafeElement decode(InputStream inStream) throws IOException { + + OriginalT originalPayload = originalPayloadCoder.decode(inStream); + CurrentT currentPayload = currentPayloadCoder.decode(inStream); + String errorMessage = STRING_CODER.decode(inStream); + String stacktrace = STRING_CODER.decode(inStream); + + return FailsafeElement.of(originalPayload, currentPayload) + .setErrorMessage(errorMessage) + .setStacktrace(stacktrace); + } + + @Override + public List> getCoderArguments() { + return Arrays.asList(originalPayloadCoder, currentPayloadCoder); + } + + @Override + public TypeDescriptor> getEncodedTypeDescriptor() { + return new TypeDescriptor>() {}.where( + new TypeParameter() {}, originalPayloadCoder.getEncodedTypeDescriptor()) + .where(new TypeParameter() {}, currentPayloadCoder.getEncodedTypeDescriptor()); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/RowToCsv.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/RowToCsv.java new file mode 100644 index 0000000000000..f8ab7d4ff39f6 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/RowToCsv.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.utils; + +import java.util.stream.Collectors; +import org.apache.beam.sdk.values.Row; + +/** The {@link RowToCsv} class to convert Beam Rows into strings in CSV format. */ +public class RowToCsv { + + private final String csvDelimiter; + + public RowToCsv(String csvDelimiter) { + this.csvDelimiter = csvDelimiter; + } + + public String getCsvFromRow(Row row) { + return row.getValues().stream() + .map(item -> item == null ? "null" : item) + .map(Object::toString) + .collect(Collectors.joining(csvDelimiter)); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java new file mode 100644 index 0000000000000..e8dd24eabeeea --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/SchemasUtils.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization.utils; + +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.fromTableSchema; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link SchemasUtils} Class to read JSON based schema. Is there available to read from file or + * from string. Currently supported local File System and GCS. + */ +@SuppressWarnings({ + "initialization.fields.uninitialized", + "method.invocation.invalid", + "dereference.of.nullable", + "argument.type.incompatible", + "return.type.incompatible" +}) +public class SchemasUtils { + + /* Logger for class.*/ + private static final Logger LOG = LoggerFactory.getLogger(SchemasUtils.class); + + private TableSchema bigQuerySchema; + private Schema beamSchema; + private String jsonBeamSchema; + + public SchemasUtils(String schema) { + parseJson(schema); + } + + public SchemasUtils(String path, Charset encoding) throws IOException { + if (path.startsWith("gs://")) { + parseJson(new String(readGcsFile(path), encoding)); + } else { + byte[] encoded = Files.readAllBytes(Paths.get(path)); + parseJson(new String(encoded, encoding)); + } + LOG.info("Extracted schema: " + bigQuerySchema.toPrettyString()); + } + + public TableSchema getBigQuerySchema() { + return bigQuerySchema; + } + + private void parseJson(String jsonSchema) throws UnsupportedOperationException { + TableSchema schema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); + validateSchemaTypes(schema); + bigQuerySchema = schema; + jsonBeamSchema = BigQueryHelpers.toJsonString(schema.getFields()); + } + + private void validateSchemaTypes(TableSchema bigQuerySchema) { + try { + beamSchema = fromTableSchema(bigQuerySchema); + } catch (UnsupportedOperationException exception) { + LOG.error("Check json schema, {}", exception.getMessage()); + } catch (NullPointerException npe) { + LOG.error("Missing schema keywords, please check what all required fields presented"); + } + } + + /** + * Method to read a schema file from GCS and return the file contents as a string. + * + * @param gcsFilePath path to file in GCS in format "gs://your-bucket/path/to/file" + * @return byte array with file contents + * @throws IOException thrown if not able to read file + */ + public static byte[] readGcsFile(String gcsFilePath) throws IOException { + LOG.info("Reading contents from GCS file: {}", gcsFilePath); + // Read the GCS file into byte array and will throw an I/O exception in case file not found. + try (ReadableByteChannel readerChannel = + FileSystems.open(FileSystems.matchSingleFileSpec(gcsFilePath).resourceId())) { + try (InputStream stream = Channels.newInputStream(readerChannel)) { + return ByteStreams.toByteArray(stream); + } + } + } + + public Schema getBeamSchema() { + return beamSchema; + } + + public String getJsonBeamSchema() { + return jsonBeamSchema; + } + + /** + * Reads a file from GCS and returns it as a string. + * + * @param filePath path to file in GCS + * @return contents of the file as a string + * @throws IOException thrown if not able to read file + */ + public static String getGcsFileAsString(String filePath) { + MatchResult result; + try { + result = FileSystems.match(filePath); + checkArgument( + result.status() == MatchResult.Status.OK && !result.metadata().isEmpty(), + "Failed to match any files with the pattern: " + filePath); + + List rId = + result.metadata().stream() + .map(MatchResult.Metadata::resourceId) + .collect(Collectors.toList()); + + checkArgument(rId.size() == 1, "Expected exactly 1 file, but got " + rId.size() + " files."); + + Reader reader = + Channels.newReader(FileSystems.open(rId.get(0)), StandardCharsets.UTF_8.name()); + + return CharStreams.toString(reader); + + } catch (IOException ioe) { + LOG.error("File system i/o error: " + ioe.getMessage()); + throw new RuntimeException(ioe); + } + } + + public static final String DEADLETTER_SCHEMA = + "{\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"timestamp\",\n" + + " \"type\": \"TIMESTAMP\",\n" + + " \"mode\": \"REQUIRED\"\n" + + " },\n" + + " {\n" + + " \"name\": \"payloadString\",\n" + + " \"type\": \"STRING\",\n" + + " \"mode\": \"REQUIRED\"\n" + + " },\n" + + " {\n" + + " \"name\": \"payloadBytes\",\n" + + " \"type\": \"BYTES\",\n" + + " \"mode\": \"REQUIRED\"\n" + + " },\n" + + " {\n" + + " \"name\": \"attributes\",\n" + + " \"type\": \"RECORD\",\n" + + " \"mode\": \"REPEATED\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"key\",\n" + + " \"type\": \"STRING\",\n" + + " \"mode\": \"NULLABLE\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value\",\n" + + " \"type\": \"STRING\",\n" + + " \"mode\": \"NULLABLE\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"name\": \"errorMessage\",\n" + + " \"type\": \"STRING\",\n" + + " \"mode\": \"NULLABLE\"\n" + + " },\n" + + " {\n" + + " \"name\": \"stacktrace\",\n" + + " \"type\": \"STRING\",\n" + + " \"mode\": \"NULLABLE\"\n" + + " }\n" + + " ]\n" + + "}"; +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/package-info.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/package-info.java new file mode 100644 index 0000000000000..e53a4f7ce0462 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2020 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Protegrity Data Tokenization template for Google Cloud Teleport. */ +package org.apache.beam.examples.complete.datatokenization.utils; diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/datatokenization/DataTokenizationTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/datatokenization/DataTokenizationTest.java new file mode 100644 index 0000000000000..a2b1ec05a2ec3 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/datatokenization/DataTokenizationTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.complete.datatokenization; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions; +import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO; +import org.apache.beam.examples.complete.datatokenization.transforms.io.TokenizationFileSystemIO.FORMAT; +import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder; +import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv; +import org.apache.beam.examples.complete.datatokenization.utils.SchemasUtils; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Resources; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test class for {@link DataTokenization}. */ +@RunWith(JUnit4.class) +public class DataTokenizationTest { + + private static final String testSchema = + "{\"fields\":[{\"mode\":\"REQUIRED\",\"name\":\"FieldName1\",\"type\":\"STRING\"},{\"mode\":\"REQUIRED\",\"name\":\"FieldName2\",\"type\":\"STRING\"}]}"; + String[] fields = {"TestValue1", "TestValue2"}; + + @Rule public final transient TestPipeline testPipeline = TestPipeline.create(); + + private static final String RESOURCES_DIR = "./"; + + private static final String CSV_FILE_PATH = + Resources.getResource(RESOURCES_DIR + "testInput.csv").getPath(); + + private static final String JSON_FILE_PATH = + Resources.getResource(RESOURCES_DIR + "testInput.txt").getPath(); + + private static final String SCHEMA_FILE_PATH = + Resources.getResource(RESOURCES_DIR + "schema.txt").getPath(); + + private static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER = + FailsafeElementCoder.of( + NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())); + + @Test + public void testGetBeamSchema() { + Schema expectedSchema = + Schema.builder() + .addField("FieldName1", FieldType.STRING) + .addField("FieldName2", FieldType.STRING) + .build(); + SchemasUtils schemasUtils = new SchemasUtils(testSchema); + Assert.assertEquals(expectedSchema, schemasUtils.getBeamSchema()); + } + + @Test + public void testGetBigQuerySchema() { + SchemasUtils schemasUtils = new SchemasUtils(testSchema); + Assert.assertEquals(testSchema, schemasUtils.getBigQuerySchema().toString()); + } + + @Test + public void testRowToCSV() { + Schema beamSchema = new SchemasUtils(testSchema).getBeamSchema(); + Row.Builder rowBuilder = Row.withSchema(beamSchema); + Row row = rowBuilder.addValues(new ArrayList<>(Arrays.asList(fields))).build(); + String csvResult = new RowToCsv(";").getCsvFromRow(row); + Assert.assertEquals(String.join(";", fields), csvResult); + } + + @Test + public void testRowToCSVWithNull() { + final String nullableTestSchema = + "{\"fields\":[{\"mode\":\"REQUIRED\",\"name\":\"FieldName1\",\"type\":\"STRING\"},{\"mode\":\"NULLABLE\",\"name\":\"FieldName2\",\"type\":\"STRING\"}]}"; + final String expectedCsv = "TestValueOne;null"; + + List values = Lists.newArrayList("TestValueOne", null); + + Schema beamSchema = new SchemasUtils(nullableTestSchema).getBeamSchema(); + Row.Builder rowBuilder = Row.withSchema(beamSchema); + Row row = rowBuilder.addValues(values).build(); + String csvResult = new RowToCsv(";").getCsvFromRow(row); + Assert.assertEquals(expectedCsv, csvResult); + } + + @Test + public void testFileSystemIOReadCSV() throws IOException { + PCollection jsons = fileSystemIORead(CSV_FILE_PATH, FORMAT.CSV); + assertRows(jsons); + testPipeline.run(); + } + + @Test + public void testFileSystemIOReadJSON() throws IOException { + PCollection jsons = fileSystemIORead(JSON_FILE_PATH, FORMAT.JSON); + assertRows(jsons); + testPipeline.run(); + } + + @Test + public void testJsonToRow() throws IOException { + PCollection rows = fileSystemIORead(JSON_FILE_PATH, FORMAT.JSON); + SchemasUtils testSchemaUtils = new SchemasUtils(SCHEMA_FILE_PATH, StandardCharsets.UTF_8); + + PAssert.that(rows) + .satisfies( + x -> { + LinkedList beamRows = Lists.newLinkedList(x); + assertThat(beamRows, hasSize(3)); + beamRows.forEach( + row -> { + List fieldValues = row.getValues(); + for (Object element : fieldValues) { + assertThat((String) element, startsWith("FieldValue")); + } + }); + return null; + }); + testPipeline.run(); + } + + private PCollection fileSystemIORead(String inputGcsFilePattern, FORMAT inputGcsFileFormat) + throws IOException { + DataTokenizationOptions options = + PipelineOptionsFactory.create().as(DataTokenizationOptions.class); + options.setDataSchemaPath(SCHEMA_FILE_PATH); + options.setInputFilePattern(inputGcsFilePattern); + options.setInputFileFormat(inputGcsFileFormat); + if (inputGcsFileFormat == FORMAT.CSV) { + options.setCsvContainsHeaders(Boolean.FALSE); + } + + SchemasUtils testSchemaUtils = + new SchemasUtils(options.getDataSchemaPath(), StandardCharsets.UTF_8); + + CoderRegistry coderRegistry = testPipeline.getCoderRegistry(); + coderRegistry.registerCoderForType( + FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER); + coderRegistry.registerCoderForType( + RowCoder.of(testSchemaUtils.getBeamSchema()).getEncodedTypeDescriptor(), + RowCoder.of(testSchemaUtils.getBeamSchema())); + /* + * Row/Row Coder for FailsafeElement. + */ + FailsafeElementCoder coder = + FailsafeElementCoder.of( + RowCoder.of(testSchemaUtils.getBeamSchema()), + RowCoder.of(testSchemaUtils.getBeamSchema())); + coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder); + + return new TokenizationFileSystemIO(options).read(testPipeline, testSchemaUtils); + } + + private void assertRows(PCollection jsons) { + PAssert.that(jsons) + .satisfies( + x -> { + LinkedList rows = Lists.newLinkedList(x); + assertThat(rows, hasSize(3)); + rows.forEach( + row -> { + assertNotNull(row.getSchema()); + assertThat(row.getSchema().getFields(), hasSize(3)); + assertThat(row.getSchema().getField(0).getName(), equalTo("Field1")); + + assertThat(row.getValues(), hasSize(3)); + }); + return null; + }); + } +} diff --git a/examples/java/src/test/resources/schema.txt b/examples/java/src/test/resources/schema.txt new file mode 100644 index 0000000000000..10f5ae2eded7c --- /dev/null +++ b/examples/java/src/test/resources/schema.txt @@ -0,0 +1,19 @@ +{ + "fields": [ + { + "mode": "REQUIRED", + "name": "Field1", + "type": "STRING" + }, + { + "mode": "REQUIRED", + "name": "Field2", + "type": "STRING" + }, + { + "mode": "REQUIRED", + "name": "Field3", + "type": "STRING" + } + ] +} \ No newline at end of file diff --git a/examples/java/src/test/resources/testInput.csv b/examples/java/src/test/resources/testInput.csv new file mode 100644 index 0000000000000..9e88e1312b27a --- /dev/null +++ b/examples/java/src/test/resources/testInput.csv @@ -0,0 +1,3 @@ +FieldValue11,FieldValue12,FieldValue13 +FieldValue21,FieldValue22,FieldValue23 +FieldValue31,FieldValue32,FieldValue33 \ No newline at end of file diff --git a/examples/java/src/test/resources/testInput.txt b/examples/java/src/test/resources/testInput.txt new file mode 100644 index 0000000000000..703469f53d182 --- /dev/null +++ b/examples/java/src/test/resources/testInput.txt @@ -0,0 +1,3 @@ +{"Field1": "FieldValue11", "Field2": "FieldValue12", "Field3": "FieldValue13"} +{"Field1": "FieldValue12", "Field2": "FieldValue22", "Field3": "FieldValue23"} +{"Field1": "FieldValue13", "Field2": "FieldValue32", "Field3": "FieldValue33"} \ No newline at end of file diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml index f113f5a9fe7f2..9eaff85e71cc6 100644 --- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml @@ -107,6 +107,7 @@ +