diff --git a/v2/bigquery-to-elasticsearch/README.md b/v2/bigquery-to-elasticsearch/README.md new file mode 100644 index 0000000000000..945f1a7dee6b6 --- /dev/null +++ b/v2/bigquery-to-elasticsearch/README.md @@ -0,0 +1,114 @@ +# BigQuery to Elasticsearch Dataflow Template + +The [BigQueryToElasticsearch](src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearch.java) pipeline ingests +data from a BigQuery table into Elasticsearch. The template can either read the entire table or read using a supplied query. + +Pipeline flow is illustrated below + +![alt text](img/bq-to-elasticsearch-dataflow.png "BQ to Elasticsearch pipeline flow") + +## Getting Started + +### Requirements +* Java 8 +* Maven +* BigQuery table exists +* Elasticsearch nodes are reachable from the Dataflow workers + +### Building Template +This is a Flex Template meaning that the pipeline code will be containerized and the container will be +run on Dataflow. + +#### Building Container Image +* Set environment variables +```sh +export PROJECT=my-project +export IMAGE_NAME=my-image-name +export BUCKET_NAME=gs:// +export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME} +export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base +export BASE_CONTAINER_IMAGE_VERSION=latest +export APP_ROOT=/template/ +export COMMAND_SPEC=${APP_ROOT}/resources/csv-to-elasticsearch-command-spec.json +export NODE_ADDRESSES=comma-separated-list-nodes +export INPUT_TABLE_SPEC=my-project:my-dataset.my-table +export INDEX=my-index +export DOCUMENT_TYPE=my-type +``` +* Build and push image to Google Container Repository +```sh +mvn clean package -Dimage=${TARGET_GCR_IMAGE} \ + -Dbase-container-image=${BASE_CONTAINER_IMAGE} \ + -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \ + -Dapp-root=${APP_ROOT} \ + -Dcommand-spec=${COMMAND_SPEC} +``` + +#### Creating Image Spec + +Create file in Cloud Storage with path to container image in Google Container Repository. +```json +{ + "docker_template_spec": { + "docker_image": "gcr.io/project/my-image-name" + } +} +``` + +### Testing Template + +The template unit tests can be run using: +```sh +mvn test +``` + +### Executing Template + +The template requires the following parameters: +* inputTableSpec: Table in BigQuery to read from in form of: my-project:my-dataset.my-table. Either this or query must be provided. +* nodeAddresses: Comma separated list of Elasticsearch nodes to connect to, ex: http://my-node1,http://my-node2 +* index: The index toward which the requests will be issued, ex: my-index +* documentType: The document type toward which the requests will be issued, ex: my-document-type +* useLegacySql: Set to true to use legacy SQL (only applicable if supplying query). Default: false + +The template has the following optional parameters: +* query: Query to run against input table, + * For Standard SQL ex: 'SELECT max_temperature FROM \`clouddataflow-readonly.samples.weather_stations\`' + * For Legacy SQL ex: 'SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]' +* batchSize: Batch size in number of documents. Default: 1000 +* batchSizeBytes: Batch size in number of bytes. Default: 5242880 (5mb) +* maxRetryAttempts: Max retry attempts, must be > 0. Default: no retries +* maxRetryDuration: Max retry duration in milliseconds, must be > 0. Default: no retries +* usePartialUpdates: Set to true to issue partial updates. Default: false +* idFnPath: Path to javascript file containing function to extract Id from document, ex: gs://path/to/idFn.js. Default: null +* idFnName: Name of javascript function to extract Id from document. Default: null +* indexFnPath: Path to javascript file containing function to extract Index from document, ex: gs://path/to/indexFn.js. Default: null +* indexFnName: Name of javascript function to extract Index from document. Default: null + * Will override index provided. +* typeFnPath: Path to javascript file containing function to extract Type from document, ex: gs://path/to/typeFn.js. Default: null +* typeFnName: Name of javascript function to extract Type from document. Default: null + * Will override type provided. + +Template can be executed using the following API call: +```sh +API_ROOT_URL="https://dataflow.googleapis.com" +TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/templates:launch" +JOB_NAME="csv-to-elasticsearch-`date +%Y%m%d-%H%M%S-%N`" +time curl -X POST -H "Content-Type: application/json" \ + -H "Authorization: Bearer $(gcloud auth print-access-token)" \ + "${TEMPLATES_LAUNCH_API}"` + `"?validateOnly=false"` + `"&dynamicTemplate.gcsPath=gs://path/to/image/spec"` + `"&dynamicTemplate.stagingLocation=gs://path/to/stagingLocation" \ + -d ' + { + "jobName":"'$JOB_NAME'", + "parameters": { + "inputTableSpec":"'$INPUT_TABLE_SPEC'", + "nodeAddresses":"'$NODE_ADDRESSES'", + "index":"'$INDEX'", + "documentType":"'$DOCUMENT_TYPE'" + } + } + ' +``` diff --git a/v2/bigquery-to-elasticsearch/img/bq-to-elasticsearch-dataflow.png b/v2/bigquery-to-elasticsearch/img/bq-to-elasticsearch-dataflow.png new file mode 100644 index 0000000000000..5cfceeee4e8bc Binary files /dev/null and b/v2/bigquery-to-elasticsearch/img/bq-to-elasticsearch-dataflow.png differ diff --git a/v2/bigquery-to-elasticsearch/pom.xml b/v2/bigquery-to-elasticsearch/pom.xml new file mode 100644 index 0000000000000..5915859ad8d06 --- /dev/null +++ b/v2/bigquery-to-elasticsearch/pom.xml @@ -0,0 +1,63 @@ + + + + + dynamic-templates + com.google.cloud.teleport.v2 + 1.0-SNAPSHOT + + 4.0.0 + + bigquery-to-elasticsearch + + + 20.0 + + + + + com.google.cloud.teleport.v2 + common + 1.0-SNAPSHOT + + + com.google.guava + guava + ${guava.version} + + + + + ${mvn-target-dir} + + + com.google.cloud.tools + jib-maven-plugin + + + package + + build + + + + + + + diff --git a/v2/bigquery-to-elasticsearch/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearch.java b/v2/bigquery-to-elasticsearch/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearch.java new file mode 100644 index 0000000000000..543e59db9ac7e --- /dev/null +++ b/v2/bigquery-to-elasticsearch/src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearch.java @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import com.google.cloud.teleport.v2.transforms.BigQueryConverters.BigQueryReadOptions; +import com.google.cloud.teleport.v2.transforms.BigQueryConverters.ReadBigQuery; +import com.google.cloud.teleport.v2.transforms.BigQueryConverters.TableRowToJsonFn; +import com.google.cloud.teleport.v2.transforms.ElasticsearchTransforms.WriteToElasticsearch; +import com.google.cloud.teleport.v2.transforms.ElasticsearchTransforms.WriteToElasticsearchOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.ParDo; + +/** + * The {@link BigQueryToElasticsearch} pipeline exports data from a BigQuery table to Elasticsearch. + * + *

Pipeline Requirements + * + *

+ * + *

Example Usage + * + *

+ * # Set the pipeline vars
+ * PROJECT=my-project
+ * BUCKET_NAME=my-bucket
+ * TABLE={$PROJECT}:my-dataset.my-table
+ * NODE_ADDRESSES=comma-separated-list-nodes
+ * INDEX=my-index
+ * DOCUMENT_TYPE=my-type
+ *
+ * # Set containerization vars
+ * IMAGE_NAME=my-image-name
+ * TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
+ * BASE_CONTAINER_IMAGE=my-base-container-image
+ * BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
+ * APP_ROOT=/path/to/app-root
+ * COMMAND_SPEC=/path/to/command-spec
+ *
+ * # Build and upload image
+ * mvn clean package \
+ * -Dimage=${TARGET_GCR_IMAGE} \
+ * -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
+ * -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
+ * -Dapp-root=${APP_ROOT} \
+ * -Dcommand-spec=${COMMAND_SPEC}
+ *
+ * # Create an image spec in GCS that contains the path to the image
+ * {
+ *    "docker_template_spec": {
+ *       "docker_image": $TARGET_GCR_IMAGE
+ *     }
+ *  }
+ *
+ * # Execute template:
+ * API_ROOT_URL="https://dataflow.googleapis.com"
+ * TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/templates:launch"
+ * JOB_NAME="bigquery-to-elasticsearch-`date +%Y%m%d-%H%M%S-%N`"
+ *
+ * time curl -X POST -H "Content-Type: application/json"     \
+ *     -H "Authorization: Bearer $(gcloud auth print-access-token)" \
+ *     "${TEMPLATES_LAUNCH_API}"`
+ *     `"?validateOnly=false"`
+ *     `"&dynamicTemplate.gcsPath=${BUCKET_NAME}/path/to/image-spec"`
+ *     `"&dynamicTemplate.stagingLocation=${BUCKET_NAME}/staging" \
+ *     -d '
+ *      {
+ *       "jobName":"'$JOB_NAME'",
+ *       "parameters": {
+ *            "inputTableSpec":"'$TABLE'",
+ *            "nodeAddresses":"'$NODE_ADDRESSES'",
+ *            "index":"'$INDEX'",
+ *            "documentType":"'$DOCUMENT_TYPE'"
+ *        }
+ *       }
+ *      '
+ * 
+ */ +public class BigQueryToElasticsearch { + + /** + * Main entry point for pipeline execution. + * + * @param args Command line arguments to the pipeline. + */ + public static void main(String[] args) { + BigQueryToElasticsearchReadOptions options = + PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(BigQueryToElasticsearchReadOptions.class); + + run(options); + } + + /** + * Runs the pipeline with the supplied options. + * + * @param options The execution parameters to the pipeline. + * @return The result of the pipeline execution. + */ + private static PipelineResult run(BigQueryToElasticsearchReadOptions options) { + + // Create the pipeline. + Pipeline pipeline = Pipeline.create(options); + + /* + * Steps: 1) Read records from BigQuery via BigQueryIO. + * 2) Create json string from Table Row. + * 3) Write records to Elasticsearch. + * + * + * Step #1: Read from BigQuery. If a query is provided then it is used to get the TableRows. + */ + pipeline + .apply( + "ReadFromBigQuery", + ReadBigQuery.newBuilder() + .setOptions(options.as(BigQueryToElasticsearchReadOptions.class)) + .build()) + + /* + * Step #2: Convert table rows to JSON documents. + */ + .apply("TableRowsToJsonDocument", ParDo.of(new TableRowToJsonFn())) + + /* + * Step #3: Write converted records to Elasticsearch + */ + .apply( + "WriteToElasticsearch", + WriteToElasticsearch.newBuilder() + .setOptions(options.as(WriteToElasticsearchOptions.class)) + .build()); + + return pipeline.run(); + } + + /** + * The {@link BigQueryToElasticsearchReadOptions} class provides the custom execution options + * passed by the executor at the command-line. + */ + public interface BigQueryToElasticsearchReadOptions + extends PipelineOptions, + BigQueryReadOptions, + WriteToElasticsearchOptions {} +} diff --git a/v2/bigquery-to-elasticsearch/src/main/java/com/google/cloud/teleport/v2/templates/package-info.java b/v2/bigquery-to-elasticsearch/src/main/java/com/google/cloud/teleport/v2/templates/package-info.java new file mode 100644 index 0000000000000..31afcdeef31ad --- /dev/null +++ b/v2/bigquery-to-elasticsearch/src/main/java/com/google/cloud/teleport/v2/templates/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Package for the BigQueryToElasticsearch template. */ +package com.google.cloud.teleport.v2.templates; diff --git a/v2/bigquery-to-elasticsearch/src/main/resources/bigquery-to-elasticsearch-command-spec.json b/v2/bigquery-to-elasticsearch/src/main/resources/bigquery-to-elasticsearch-command-spec.json new file mode 100644 index 0000000000000..5762096be647d --- /dev/null +++ b/v2/bigquery-to-elasticsearch/src/main/resources/bigquery-to-elasticsearch-command-spec.json @@ -0,0 +1,4 @@ +{ + "mainClass": "com.google.cloud.teleport.v2.templates.BigQueryToElasticsearch", + "classPath": "/template/BigQueryToElasticsearch/libs/*:/template/BigQueryToElasticsearch/classes" +} diff --git a/v2/bigquery-to-elasticsearch/src/test/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearchTest.java b/v2/bigquery-to-elasticsearch/src/test/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearchTest.java new file mode 100644 index 0000000000000..a344302310f66 --- /dev/null +++ b/v2/bigquery-to-elasticsearch/src/test/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearchTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.teleport.v2.transforms.BigQueryConverters; +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** Test cases for {@link BigQueryToElasticsearch}. */ +public class BigQueryToElasticsearchTest { + + private static final TableRow tableRow = + new TableRow().set("id", "007").set("state", "CA").set("price", 26.23); + private static final List rows = ImmutableList.of(tableRow); + private static final String jsonifiedTableRow = + "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedException exceptionRule = ExpectedException.none(); + + /** Test the {@link BigQueryToElasticsearch} pipeline end-to-end. */ + @Test + public void testBigQueryToElasticsearchE2E() { + + BigQueryToElasticsearch.BigQueryToElasticsearchReadOptions options = + PipelineOptionsFactory.create() + .as(BigQueryToElasticsearch.BigQueryToElasticsearchReadOptions.class); + + options.setNodeAddresses("http://my-node"); + options.setIndex("test"); + options.setDocumentType("_doc"); + options.setInputTableSpec("my-project:my-dataset.my-table"); + options.setQuery(null); + + // Build pipeline + PCollection testStrings = + pipeline + .apply("CreateInput", Create.of(rows)) + .apply("TestTableRowToJson", ParDo.of(new BigQueryConverters.TableRowToJsonFn())); + + PAssert.that(testStrings) + .satisfies( + collection -> { + String result = collection.iterator().next(); + assertThat(result, is(equalTo(jsonifiedTableRow))); + return null; + }); + + // Execute pipeline + pipeline.run(); + } + + /** + * Tests that the {@link BigQueryToElasticsearch} pipeline throws an {@link + * IllegalArgumentException} when no query or input table spec is provided. + */ + @Test + public void testNoQueryOrInputTableSpec() { + exceptionRule.expect(IllegalArgumentException.class); + + BigQueryToElasticsearch.BigQueryToElasticsearchReadOptions options = + PipelineOptionsFactory.create() + .as(BigQueryToElasticsearch.BigQueryToElasticsearchReadOptions.class); + + options.setNodeAddresses("http://my-node"); + options.setIndex("test"); + options.setDocumentType("_doc"); + options.setInputTableSpec(null); + options.setQuery(null); + + // Build pipeline + pipeline.apply("CreateInput", Create.of(tableRow)); + + // Execute pipeline + pipeline.run(); + } +} diff --git a/v2/common/pom.xml b/v2/common/pom.xml index 9eb3d0280267b..af6b6d26cb352 100644 --- a/v2/common/pom.xml +++ b/v2/common/pom.xml @@ -32,6 +32,7 @@ 1.7 2.7 20.0 + 1.27.0 2.9.9 diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryConverters.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryConverters.java index 7c82b12010906..20ac1945be6f3 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryConverters.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryConverters.java @@ -15,29 +15,110 @@ */ package com.google.cloud.teleport.v2.transforms; +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; + import com.google.api.services.bigquery.model.TableRow; import com.google.auto.value.AutoValue; +import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions; import com.google.cloud.teleport.v2.values.FailsafeElement; +import com.google.gson.Gson; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.coders.Coder.Context; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method; import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -/** - * Common transforms for Teleport BigQueryIO. - */ +/** Common transforms for Teleport BigQueryIO. */ public class BigQueryConverters { + /* Logger for class. */ + private static final Logger LOG = LoggerFactory.getLogger(BigQueryConverters.class); + + /** + * Converts a JSON string to a {@link TableRow} object. If the data fails to convert, a {@link + * RuntimeException} will be thrown. + * + * @param json The JSON string to parse. + * @return The parsed {@link TableRow} object. + */ + private static TableRow convertJsonToTableRow(String json) { + TableRow row; + // Parse the JSON into a {@link TableRow} object. + try (InputStream inputStream = + new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { + row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER); + + } catch (IOException e) { + throw new RuntimeException("Failed to serialize json to table row: " + json, e); + } + + return row; + } + + /** + * The {@link TableRowToJsonFn} class converts a tableRow to Json using {@link #tableRowToJson(TableRow)}. + */ + public static class TableRowToJsonFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext context) { + TableRow row = context.element(); + context.output(tableRowToJson(row)); + } + } + + /** Converts a {@link TableRow} into a Json string using {@link Gson}. */ + private static String tableRowToJson(TableRow row) { return new Gson().toJson(row, TableRow.class); } + + /** + * The {@link BigQueryReadOptions} interface contains option necessary to interface with BigQuery. + */ + public interface BigQueryReadOptions extends PipelineOptions { + @Description("BigQuery table to export from in the form :.") + String getInputTableSpec(); + + void setInputTableSpec(String inputTableSpec); + + @Description( + "The dead-letter table to output to within BigQuery in :.
" + + "format. If it doesn't exist, it will be created during pipeline execution.") + String getOutputDeadletterTable(); + + void setOutputDeadletterTable(String outputDeadletterTable); + + @Description("Optional: Query to run against input table") + String getQuery(); + + void setQuery(String query); + + @Description("Set to true to use legacy SQL. Default:false") + @Default.Boolean(false) + Boolean getUseLegacySql(); + + void setUseLegacySql(Boolean useLegacySql); + } + /** * The {@link FailsafeJsonToTableRow} transform converts JSON strings to {@link TableRow} objects. * The transform accepts a {@link FailsafeElement} object so the original payload of the incoming @@ -47,17 +128,41 @@ public class BigQueryConverters { public abstract static class FailsafeJsonToTableRow extends PTransform>, PCollectionTuple> { + public static Builder newBuilder() { + return new AutoValue_BigQueryConverters_FailsafeJsonToTableRow.Builder<>(); + } + public abstract TupleTag successTag(); public abstract TupleTag> failureTag(); - public static Builder newBuilder() { - return new AutoValue_BigQueryConverters_FailsafeJsonToTableRow.Builder<>(); + @Override + public PCollectionTuple expand(PCollection> failsafeElements) { + return failsafeElements.apply( + "JsonToTableRow", + ParDo.of( + new DoFn, TableRow>() { + @ProcessElement + public void processElement(ProcessContext context) { + FailsafeElement element = context.element(); + String json = element.getPayload(); + + try { + TableRow row = convertJsonToTableRow(json); + context.output(row); + } catch (Exception e) { + context.output( + failureTag(), + FailsafeElement.of(element) + .setErrorMessage(e.getMessage()) + .setStacktrace(Throwables.getStackTraceAsString(e))); + } + } + }) + .withOutputTags(successTag(), TupleTagList.of(failureTag()))); } - /** - * Builder for {@link FailsafeJsonToTableRow}. - */ + /** Builder for {@link FailsafeJsonToTableRow}. */ @AutoValue.Builder public abstract static class Builder { @@ -67,52 +172,235 @@ public abstract static class Builder { public abstract FailsafeJsonToTableRow build(); } + } + + /** + * The {@link ReadBigQuery} class reads from BigQuery using {@link BigQueryIO}. The transform + * returns a {@link PCollection} of {@link TableRow}. + */ + @AutoValue + public abstract static class ReadBigQuery extends PTransform> { + + public static Builder newBuilder() { + return new AutoValue_BigQueryConverters_ReadBigQuery.Builder(); + } + + public abstract BigQueryReadOptions options(); @Override - public PCollectionTuple expand(PCollection> failsafeElements) { - return failsafeElements.apply( - "JsonToTableRow", - ParDo.of( - new DoFn, TableRow>() { - @ProcessElement - public void processElement(ProcessContext context) { - FailsafeElement element = context.element(); - String json = element.getPayload(); - - try { - TableRow row = convertJsonToTableRow(json); - context.output(row); - } catch (Exception e) { - context.output( - failureTag(), - FailsafeElement.of(element) - .setErrorMessage(e.getMessage()) - .setStacktrace(Throwables.getStackTraceAsString(e))); - } - } - }) - .withOutputTags(successTag(), TupleTagList.of(failureTag()))); + public PCollection expand(PBegin pipeline) { + + if (options().getQuery() == null) { + LOG.info("No query provided, reading directly from: " + options().getInputTableSpec()); + return pipeline.apply( + "ReadFromBigQuery", + BigQueryIO.readTableRows() + .from(options().getInputTableSpec()) + .withTemplateCompatibility() + .withMethod(Method.DIRECT_READ) + .withCoder(TableRowJsonCoder.of())); + + } else { + LOG.info("Using query: " + options().getQuery()); + + if (!options().getUseLegacySql()) { + + LOG.info("Using Standard SQL"); + return pipeline.apply( + "ReadFromBigQueryWithQuery", + BigQueryIO.readTableRows() + .fromQuery(options().getQuery()) + .withTemplateCompatibility() + .usingStandardSql() + .withCoder(TableRowJsonCoder.of())); + } else { + + LOG.info("Using Legacy SQL"); + return pipeline.apply( + "ReadFromBigQueryWithQuery", + BigQueryIO.readTableRows() + .fromQuery(options().getQuery()) + .withTemplateCompatibility() + .withCoder(TableRowJsonCoder.of())); + } + } + } + + /** Builder for {@link ReadBigQuery}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setOptions(BigQueryReadOptions options); + + abstract ReadBigQuery autoBuild(); + + public ReadBigQuery build() { + + ReadBigQuery readBigQuery = autoBuild(); + + if (readBigQuery.options().getInputTableSpec() == null) { + checkArgument( + readBigQuery.options().getQuery() != null, + "If no inputTableSpec is provided then a query is required."); + } + + if (readBigQuery.options().getQuery() == null) { + checkArgument( + readBigQuery.options().getInputTableSpec() != null, + "If no query is provided then an inputTableSpec is required."); + } + + return readBigQuery; + } } } /** - * Converts a JSON string to a {@link TableRow} object. If the data fails to convert, a {@link - * RuntimeException} will be thrown. + * The {@link TableRowToFailsafeJsonDocument} class is a {@link PTransform} which transforms {@link + * TableRow} objects into Json documents for insertion into Elasticsearch. Optionally a javascript + * UDF can be supplied to parse the {@link TableRow} object. The executions of the UDF and + * transformation to {@link TableRow} objects is done in a fail-safe way by wrapping the element + * with it's original payload inside the {@link FailsafeElement} class. The {@link + * TableRowToFailsafeJsonDocument} transform will output a {@link PCollectionTuple} which contains all + * output and dead-letter {@link PCollection}. * - * @param json The JSON string to parse. - * @return The parsed {@link TableRow} object. + *

The {@link PCollectionTuple} output will contain the following {@link PCollection}: + * + *

    + *
  • {@link TableRowToFailsafeJsonDocument#transformOutTag()} - Contains all records successfully + * converted from JSON to {@link TableRow} objects. + *
  • {@link TableRowToFailsafeJsonDocument#transformDeadletterOutTag()} - Contains all {@link + * FailsafeElement} records which couldn't be converted to table rows. + *
*/ - private static TableRow convertJsonToTableRow(String json) { - TableRow row; - // Parse the JSON into a {@link TableRow} object. - try (InputStream inputStream = - new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8))) { - row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER); + @AutoValue + public abstract static class TableRowToFailsafeJsonDocument + extends PTransform, PCollectionTuple> { - } catch (IOException e) { - throw new RuntimeException("Failed to serialize json to table row: " + json, e); + public static Builder newBuilder() { + return new AutoValue_BigQueryConverters_TableRowToFailsafeJsonDocument.Builder(); } - return row; + public abstract JavascriptTextTransformerOptions options(); + + public abstract TupleTag> udfOutTag(); + + public abstract TupleTag> udfDeadletterOutTag(); + + public abstract TupleTag> transformOutTag(); + + public abstract TupleTag> transformDeadletterOutTag(); + + @Override + public PCollectionTuple expand(PCollection input) { + + PCollectionTuple udfOut; + + PCollectionTuple failsafeTableRows = + input.apply( + "TableRowToFailsafeElement", + ParDo.of(new TableRowToFailsafeElementFn(transformDeadletterOutTag())) + .withOutputTags(transformOutTag(), TupleTagList.of(transformDeadletterOutTag()))); + + // Use Udf to parse table rows if supplied. + if (options().getJavascriptTextTransformGcsPath() != null) { + udfOut = + failsafeTableRows + .get(transformOutTag()) + .apply( + "ProcessFailsafeRowsUdf", + JavascriptTextTransformer.FailsafeJavascriptUdf.newBuilder() + .setFileSystemPath(options().getJavascriptTextTransformGcsPath()) + .setFunctionName(options().getJavascriptTextTransformFunctionName()) + .setSuccessTag(udfOutTag()) + .setFailureTag(udfDeadletterOutTag()) + .build()); + + PCollection> failedOut = + PCollectionList.of(udfOut.get(udfDeadletterOutTag())) + .and(failsafeTableRows.get(transformDeadletterOutTag())) + .apply("FlattenFailedOut", Flatten.pCollections()); + + return PCollectionTuple.of(transformOutTag(), udfOut.get(udfOutTag())) + .and(transformDeadletterOutTag(), failedOut); + } else { + return failsafeTableRows; + } + } + + /** Builder for {@link TableRowToFailsafeJsonDocument}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setOptions(JavascriptTextTransformerOptions options); + + public abstract Builder setTransformOutTag( + TupleTag> transformOutTag); + + public abstract Builder setTransformDeadletterOutTag( + TupleTag> transformDeadletterOutTag); + + public abstract Builder setUdfOutTag(TupleTag> udfOutTag); + + public abstract Builder setUdfDeadletterOutTag( + TupleTag> udfDeadletterOutTag); + + public abstract TableRowToFailsafeJsonDocument build(); + } + } + + /** + * The {@link TableRowToFailsafeElementFn} wraps an {@link TableRow} with the {@link + * FailsafeElement} class so errors can be recovered from and the original message can be output + * to a error records table. + */ + static class TableRowToFailsafeElementFn + extends DoFn> { + + private final TupleTag> transformDeadletterOutTag; + + /** {@link Counter} for successfully processed elements. */ + private Counter successCounter = + Metrics.counter(TableRowToFailsafeElementFn.class, "SuccessProcessCounter"); + + /** {@link Counter} for un-successfully processed elements. */ + private Counter failedCounter = + Metrics.counter(TableRowToFailsafeElementFn.class, "FailedProcessCounter"); + + TableRowToFailsafeElementFn( + TupleTag> transformDeadletterOutTag) { + + this.transformDeadletterOutTag = transformDeadletterOutTag; + } + + @ProcessElement + public void processElement(ProcessContext context) { + TableRow row = context.element(); + try { + context.output(FailsafeElement.of(row, tableRowToJson(row))); + successCounter.inc(); + } catch (Exception e) { + context.output( + this.transformDeadletterOutTag, + FailsafeElement.of(row, row.toString()) + .setErrorMessage(e.getMessage()) + .setStacktrace(Throwables.getStackTraceAsString(e))); + failedCounter.inc(); + } + } + } + + /** + * The {@link FailsafeTableRowToFailsafeStringFn} converts a {@link FailsafeElement} containing a + * {@link TableRow} and string into a {@link FailsafeElement} containing two strings. The output + * {@link FailsafeElement#getOriginalPayload()} will return {@link TableRow#toString()}. + */ + public static class FailsafeTableRowToFailsafeStringFn + extends DoFn, FailsafeElement> { + + @ProcessElement + public void processElement(ProcessContext context) { + FailsafeElement element = context.element(); + context.output( + FailsafeElement.of(element.getOriginalPayload().toString(), element.getPayload())); + } } } diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransforms.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransforms.java index 50553b040e3cd..fb404e5d99899 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransforms.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransforms.java @@ -17,17 +17,12 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; -import com.fasterxml.jackson.databind.JsonNode; import com.google.auto.value.AutoValue; -import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptRuntime; -import java.io.IOException; +import com.google.cloud.teleport.v2.transforms.ValueExtractorTransform.ValueExtractorFn; import java.util.Optional; -import javax.annotation.Nullable; -import javax.script.ScriptException; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration; -import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write.FieldValueExtractFn; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; @@ -215,112 +210,60 @@ public PDone expand(PCollection jsonStrings) { @AutoValue.Builder public abstract static class Builder { public abstract Builder setOptions(WriteToElasticsearchOptions options); + + abstract WriteToElasticsearchOptions options(); abstract WriteToElasticsearch autoBuild(); public WriteToElasticsearch build() { - WriteToElasticsearch write = autoBuild(); - checkArgument( - write.options().getNodeAddresses() != null, + options().getNodeAddresses() != null, "Node address(es) must not be null."); - checkArgument(write.options().getDocumentType() != null, + checkArgument(options().getDocumentType() != null, "Document type must not be null."); - checkArgument(write.options().getIndex() != null, + checkArgument(options().getIndex() != null, "Index must not be null."); checkArgument( - write.options().getBatchSize() > 0, - "Batch size must be > 0. Got: " + write.options().getBatchSize()); + options().getBatchSize() > 0, + "Batch size must be > 0. Got: " + options().getBatchSize()); checkArgument( - write.options().getBatchSizeBytes() > 0, - "Batch size bytes must be > 0. Got: " + write.options().getBatchSizeBytes()); + options().getBatchSizeBytes() > 0, + "Batch size bytes must be > 0. Got: " + options().getBatchSizeBytes()); /* Check that both {@link RetryConfiguration} parameters are supplied. */ - if (write.options().getMaxRetryAttempts() != null - || write.options().getMaxRetryDuration() != null) { + if (options().getMaxRetryAttempts() != null + || options().getMaxRetryDuration() != null) { checkArgument( - write.options().getMaxRetryDuration() != null - && write.options().getMaxRetryAttempts() != null, + options().getMaxRetryDuration() != null + && options().getMaxRetryAttempts() != null, "Both max retry duration and max attempts must be supplied."); } - if (write.options().getIdFnName() != null || write.options().getIdFnPath() != null) { + if (options().getIdFnName() != null || options().getIdFnPath() != null) { checkArgument( - write.options().getIdFnName() != null && write.options().getIdFnPath() != null, + options().getIdFnName() != null && options().getIdFnPath() != null, "Both IdFn name and path must be supplied."); } - if (write.options().getIndexFnName() != null || write.options().getIndexFnPath() != null) { + if (options().getIndexFnName() != null || options().getIndexFnPath() != null) { checkArgument( - write.options().getIndexFnName() != null && write.options().getIndexFnPath() != null, + options().getIndexFnName() != null && options().getIndexFnPath() != null, "Both IndexFn name and path must be supplied."); } - if (write.options().getTypeFnName() != null || write.options().getTypeFnPath() != null) { + if (options().getTypeFnName() != null || options().getTypeFnPath() != null) { checkArgument( - write.options().getTypeFnName() != null && write.options().getTypeFnPath() != null, + options().getTypeFnName() != null && options().getTypeFnPath() != null, "Both TypeFn name and path must be supplied."); } - return write; - } - } - } - - /** - * Class for routing functions that implements {@link FieldValueExtractFn}. {@link - * ValueExtractorFn#apply(JsonNode)} will return null if {@link ValueExtractorFn#functionName()} - * or {@link ValueExtractorFn#fileSystemPath()} are null meaning no function is applied to the - * document. - */ - @AutoValue - public abstract static class ValueExtractorFn implements FieldValueExtractFn { - public static Builder newBuilder() { - return new AutoValue_ElasticsearchTransforms_ValueExtractorFn.Builder(); - } - - @Nullable - abstract String functionName(); - - @Nullable - abstract String fileSystemPath(); - - @Override - public String apply(JsonNode input) { - if (functionName() == null && fileSystemPath() == null) { - return null; - } else { - checkArgument( - functionName() != null && fileSystemPath() != null, - "Both function name and file system path need to be set."); - } - - JavascriptRuntime runtime = - JavascriptRuntime.newBuilder() - .setFunctionName(functionName()) - .setFileSystemPath(fileSystemPath()) - .build(); - - try { - return runtime.invoke(input.toString()); - } catch (ScriptException | IOException | NoSuchMethodException e) { - throw new RuntimeException("Error in processing field value extraction: " + e.getMessage()); + return autoBuild(); } } - - /** Builder for {@link ValueExtractorFn}. */ - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setFunctionName(String functionName); - - public abstract Builder setFileSystemPath(String fileSystemPath); - - public abstract ValueExtractorFn build(); - } } } diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ValueExtractorTransform.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ValueExtractorTransform.java new file mode 100644 index 0000000000000..292342303ca1f --- /dev/null +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ValueExtractorTransform.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.transforms; + +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.auto.value.AutoValue; +import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptRuntime; +import java.io.IOException; +import javax.annotation.Nullable; +import javax.script.ScriptException; +import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Write.FieldValueExtractFn; + +/** + * The {@link ValueExtractorTransform} allows for any Javascript function to be applied to a {@link JsonNode}. + * + * The transform takes in the path to the JS function via {@link ValueExtractorFn#fileSystemPath()} + * and the name of JS function to be applied via {@link ValueExtractorFn#functionName()}. The transform will + * return a {@link String} that is the result of the JS function. + */ +class ValueExtractorTransform { + + /** + * Class for routing functions that implements {@link FieldValueExtractFn}. {@link + * ValueExtractorFn#apply(JsonNode)} will return null if {@link ValueExtractorFn#functionName()} + * and {@link ValueExtractorFn#fileSystemPath()} are null meaning no function is applied to the + * document. + * + *

If only one of {@link ValueExtractorFn#functionName()} or {@link + * ValueExtractorFn#fileSystemPath()} are null then {@link com.google.api.gax.rpc.InvalidArgumentException} is thrown. + */ + @AutoValue + public abstract static class ValueExtractorFn implements FieldValueExtractFn { + public static Builder newBuilder() { + return new AutoValue_ValueExtractorTransform_ValueExtractorFn.Builder(); + } + + @Nullable + abstract String functionName(); + + @Nullable + abstract String fileSystemPath(); + + @Override + public String apply(JsonNode input) { + if (functionName() == null && fileSystemPath() == null) { + return null; + } else { + checkArgument( + functionName() != null && fileSystemPath() != null, + "Both function name and file system path need to be set."); + } + + JavascriptRuntime runtime = + JavascriptRuntime.newBuilder() + .setFunctionName(functionName()) + .setFileSystemPath(fileSystemPath()) + .build(); + + try { + return runtime.invoke(input.toString()); + } catch (ScriptException | IOException | NoSuchMethodException e) { + throw new RuntimeException("Error in processing field value extraction: " + e.getMessage()); + } + } + + /** Builder for {@link ValueExtractorFn}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setFunctionName(String functionName); + + public abstract Builder setFileSystemPath(String fileSystemPath); + + public abstract ValueExtractorFn build(); + } + } +} diff --git a/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/BigQueryConvertersTest.java b/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/BigQueryConvertersTest.java index bd8c919fc67bc..edf48d49682c1 100644 --- a/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/BigQueryConvertersTest.java +++ b/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/BigQueryConvertersTest.java @@ -34,9 +34,12 @@ import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.NeedsRunner; @@ -56,16 +59,39 @@ @RunWith(JUnit4.class) public class BigQueryConvertersTest { - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - @Rule public ExpectedException expectedException = ExpectedException.none(); + static final TableRow ROW = + new TableRow().set("id", "007").set("state", "CA").set("price", 26.23); + /** The tag for the main output of the json transformation. */ + static final TupleTag> TRANSFORM_OUT = + new TupleTag>() {}; + /** The tag for the dead-letter output of the json to table row transform. */ + static final TupleTag> TRANSFORM_DEADLETTER_OUT = + new TupleTag>() {}; + /** The tag for the main output of the json transformation. */ + static final TupleTag> UDF_OUT = + new TupleTag>() {}; + /** The tag for the dead-letter output of the json to table row transform. */ + static final TupleTag> UDF_TRANSFORM_DEADLETTER_OUT = + new TupleTag>() {}; + /** String/String Coder for FailsafeElement. */ + static final FailsafeElementCoder FAILSAFE_ELEMENT_CODER = + FailsafeElementCoder.of( + NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())); + /** TableRow/String Coder for FailsafeElement. */ + static final FailsafeElementCoder FAILSAFE_TABLE_ROW_ELEMENT_CODER = + FailsafeElementCoder.of(TableRowJsonCoder.of(), NullableCoder.of(StringUtf8Coder.of())); // Define the TupleTag's here otherwise the anonymous class will force the test method to // be serialized. private static final TupleTag TABLE_ROW_TAG = new TupleTag() {}; - private static final TupleTag> FAILSAFE_ELM_TAG = new TupleTag>() {}; - + private static final String jsonifiedTableRow = + "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; + private static final String udfOutputRow = + "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedException expectedException = ExpectedException.none(); private ValueProvider entityKind = StaticValueProvider.of("TestEntity"); private ValueProvider uniqueNameColumn = StaticValueProvider.of("id"); private ValueProvider namespace = StaticValueProvider.of("bq-to-ds-test"); @@ -295,4 +321,69 @@ private Record generateNestedAvroRecord() { builder.set("address", addressBuilder); return builder.build(); } + + /** + * Tests {@link com.google.cloud.teleport.v2.transforms.BigQueryConverters.ReadBigQuery} throws + * exception when neither a query or input table is provided. + */ + @Test(expected = IllegalArgumentException.class) + public void testReadBigQueryInvalidInput() { + + BigQueryConverters.BigQueryReadOptions options = + PipelineOptionsFactory.create().as(BigQueryConverters.BigQueryReadOptions.class); + + options.setInputTableSpec(null); + options.setQuery(null); + + pipeline.apply(BigQueryConverters.ReadBigQuery.newBuilder().setOptions(options).build()); + + pipeline.run(); + } + + /** Tests that {@link BigQueryConverters.TableRowToFailsafeJsonDocument} transform returns the correct element. */ + @Test + public void testTableRowToJsonDocument() { + CoderRegistry coderRegistry = pipeline.getCoderRegistry(); + + coderRegistry.registerCoderForType( + FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), + FAILSAFE_ELEMENT_CODER); + + coderRegistry.registerCoderForType( + FAILSAFE_TABLE_ROW_ELEMENT_CODER.getEncodedTypeDescriptor(), + FAILSAFE_TABLE_ROW_ELEMENT_CODER); + + BigQueryConverters.BigQueryReadOptions options = + PipelineOptionsFactory.create().as(BigQueryConverters.BigQueryReadOptions.class); + + options.setInputTableSpec(null); + options.setQuery(null); + + + PCollectionTuple testTuple = + pipeline + .apply("Create Input", Create.of(ROW).withCoder(TableRowJsonCoder.of())) + .apply( + "TestRowToDocument", + BigQueryConverters.TableRowToFailsafeJsonDocument.newBuilder() + .setTransformDeadletterOutTag(TRANSFORM_DEADLETTER_OUT) + .setTransformOutTag(TRANSFORM_OUT) + .setUdfDeadletterOutTag(UDF_TRANSFORM_DEADLETTER_OUT) + .setUdfOutTag(UDF_OUT) + .setOptions(options.as(JavascriptTextTransformer.JavascriptTextTransformerOptions.class)) + .build()); + + // Assert + PAssert.that(testTuple.get(TRANSFORM_OUT)).satisfies( + collection -> { + FailsafeElement element = collection.iterator().next(); + assertThat(element.getOriginalPayload(), is(equalTo(ROW))); + assertThat(element.getPayload(), is(equalTo(jsonifiedTableRow))); + return null; + } + ); + + // Execute pipeline + pipeline.run(); + } } diff --git a/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransformsTest.java b/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransformsTest.java index fa5f888a5945b..03deb5b201350 100644 --- a/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransformsTest.java +++ b/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/ElasticsearchTransformsTest.java @@ -15,15 +15,7 @@ */ package com.google.cloud.teleport.v2.transforms; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.cloud.teleport.v2.transforms.ElasticsearchTransforms.WriteToElasticsearch; -import com.google.common.io.Resources; -import java.io.IOException; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -34,11 +26,6 @@ /** Test cases for the {@link ElasticsearchTransforms} class. */ public class ElasticsearchTransformsTest { - private static final String RESOURCES_DIR = "JavascriptTextTransformerTest/"; - - private static final String TRANSFORM_FILE_PATH = - Resources.getResource(RESOURCES_DIR + "transform.js").getPath(); - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException exceptionRule = ExpectedException.none(); @@ -129,85 +116,7 @@ public void testInvalidNodeAddresses() { pipeline.run(); } - /** Tests that {@link ElasticsearchTransforms.ValueExtractorFn} returns the correct value. */ - @Test - public void testValueExtractorFn() throws IOException { - - final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; - - final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; - - ObjectMapper objectMapper = new ObjectMapper(); - - JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); - - String result = ElasticsearchTransforms.ValueExtractorFn.newBuilder() - .setFileSystemPath(TRANSFORM_FILE_PATH) - .setFunctionName("transform") - .build() - .apply(jsonNode); - - assertThat(result, is(jsonRecord)); - } - - /** Tests that {@link ElasticsearchTransforms.ValueExtractorFn} returns null when both inputs are null. */ - @Test - public void testValueExtractorFnReturnNull() throws IOException { - - final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; - - final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; - - ObjectMapper objectMapper = new ObjectMapper(); - - JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); - String result = ElasticsearchTransforms.ValueExtractorFn.newBuilder() - .setFileSystemPath(null) - .setFunctionName(null) - .build() - .apply(jsonNode); - - assertThat(result, is(nullValue())); - } - - /** Tests that {@link ElasticsearchTransforms.ValueExtractorFn} throws exception if only {@link ElasticsearchTransforms.ValueExtractorFn#fileSystemPath()} is supplied. */ - @Test(expected = IllegalArgumentException.class) - public void testValueExtractorFnNullSystemPath() throws IOException { - - final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; - - final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; - - ObjectMapper objectMapper = new ObjectMapper(); - - JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); - - String result = ElasticsearchTransforms.ValueExtractorFn.newBuilder() - .setFileSystemPath(null) - .setFunctionName("transform") - .build() - .apply(jsonNode); - } - - /** Tests that {@link ElasticsearchTransforms.ValueExtractorFn} throws exception if only {@link ElasticsearchTransforms.ValueExtractorFn#functionName()} is supplied. */ - @Test(expected = IllegalArgumentException.class) - public void testValueExtractorFnNullFunctionName() throws IOException { - - final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; - - final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; - - ObjectMapper objectMapper = new ObjectMapper(); - - JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); - - String result = ElasticsearchTransforms.ValueExtractorFn.newBuilder() - .setFileSystemPath(TRANSFORM_FILE_PATH) - .setFunctionName(null) - .build() - .apply(jsonNode); - } /** Tests that {@link WriteToElasticsearch} throws an exception if {@link org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration} are invalid. */ @Test diff --git a/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/ValueExtractorTransformTest.java b/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/ValueExtractorTransformTest.java new file mode 100644 index 0000000000000..fd77b4ce2ac92 --- /dev/null +++ b/v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/ValueExtractorTransformTest.java @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.transforms; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.Resources; +import java.io.IOException; +import org.junit.Test; + +/** Tests for the {@link ValueExtractorTransform} transform. */ +public class ValueExtractorTransformTest { + + private static final String RESOURCES_DIR = "JavascriptTextTransformerTest/"; + + private static final String TRANSFORM_FILE_PATH = + Resources.getResource(RESOURCES_DIR + "transform.js").getPath(); + + /** Tests that {@link ValueExtractorTransform.ValueExtractorFn} returns the correct value. */ + @Test + public void testValueExtractorFn() throws IOException { + + final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; + + final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; + + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); + + String result = ValueExtractorTransform.ValueExtractorFn.newBuilder() + .setFileSystemPath(TRANSFORM_FILE_PATH) + .setFunctionName("transform") + .build() + .apply(jsonNode); + + assertThat(result, is(jsonRecord)); + } + + /** Tests that {@link ValueExtractorTransform.ValueExtractorFn} returns null when both inputs are null. */ + @Test + public void testValueExtractorFnReturnNull() throws IOException { + + final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; + + final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; + + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); + + String result = ValueExtractorTransform.ValueExtractorFn.newBuilder() + .setFileSystemPath(null) + .setFunctionName(null) + .build() + .apply(jsonNode); + + assertThat(result, is(nullValue())); + } + + /** Tests that {@link ValueExtractorTransform.ValueExtractorFn} throws exception if only {@link ValueExtractorTransform.ValueExtractorFn#fileSystemPath()} is supplied. */ + @Test(expected = IllegalArgumentException.class) + public void testValueExtractorFnNullSystemPath() throws IOException { + + final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; + + final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; + + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); + + String result = ValueExtractorTransform.ValueExtractorFn.newBuilder() + .setFileSystemPath(null) + .setFunctionName("transform") + .build() + .apply(jsonNode); + } + + /** Tests that {@link ValueExtractorTransform.ValueExtractorFn} throws exception if only {@link ValueExtractorTransform.ValueExtractorFn#functionName()} is supplied. */ + @Test(expected = IllegalArgumentException.class) + public void testValueExtractorFnNullFunctionName() throws IOException { + + final String stringifiedJsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23}"; + + final String jsonRecord = "{\"id\":\"007\",\"state\":\"CA\",\"price\":26.23,\"someProp\":\"someValue\"}"; + + ObjectMapper objectMapper = new ObjectMapper(); + + JsonNode jsonNode = objectMapper.readTree(stringifiedJsonRecord); + + String result = ValueExtractorTransform.ValueExtractorFn.newBuilder() + .setFileSystemPath(TRANSFORM_FILE_PATH) + .setFunctionName(null) + .build() + .apply(jsonNode); + } +} diff --git a/v2/pom.xml b/v2/pom.xml index 5834c59210a1e..8c462689c587f 100644 --- a/v2/pom.xml +++ b/v2/pom.xml @@ -327,12 +327,33 @@ org.apache.beam:beam-runners-google-cloud-dataflow-java + + org.slf4j:slf4j-simple + + + com.google.code.findbugs:jsr305 + org.apache.beam:beam-runners-direct-java org.threeten:threetenbp + + org.apache.beam:beam-sdks-java-core + + + com.google.auto.value:auto-value-annotations + + + joda-time:joda-time + + + org.apache.beam:beam-vendor-guava-20_0 + + + org.apache.beam:beam-sdks-java-io-google-cloud-platform + @@ -552,6 +573,7 @@ kafka-to-bigquery common csv-to-elasticsearch + bigquery-to-elasticsearch