Skip to content

Commit

Permalink
Merge pull request apache#61 from chrlarsen:bigquery-to-elasticsearch
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 278792316
  • Loading branch information
cloud-teleport committed Nov 6, 2019
2 parents a0ed600 + e17425d commit 289c5ba
Show file tree
Hide file tree
Showing 15 changed files with 1,143 additions and 217 deletions.
114 changes: 114 additions & 0 deletions v2/bigquery-to-elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# BigQuery to Elasticsearch Dataflow Template

The [BigQueryToElasticsearch](src/main/java/com/google/cloud/teleport/v2/templates/BigQueryToElasticsearch.java) pipeline ingests
data from a BigQuery table into Elasticsearch. The template can either read the entire table or read using a supplied query.

Pipeline flow is illustrated below

![alt text](img/bq-to-elasticsearch-dataflow.png "BQ to Elasticsearch pipeline flow")

## Getting Started

### Requirements
* Java 8
* Maven
* BigQuery table exists
* Elasticsearch nodes are reachable from the Dataflow workers

### Building Template
This is a Flex Template meaning that the pipeline code will be containerized and the container will be
run on Dataflow.

#### Building Container Image
* Set environment variables
```sh
export PROJECT=my-project
export IMAGE_NAME=my-image-name
export BUCKET_NAME=gs://<bucket-name>
export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
export BASE_CONTAINER_IMAGE_VERSION=latest
export APP_ROOT=/template/<template-class>
export COMMAND_SPEC=${APP_ROOT}/resources/csv-to-elasticsearch-command-spec.json
export NODE_ADDRESSES=comma-separated-list-nodes
export INPUT_TABLE_SPEC=my-project:my-dataset.my-table
export INDEX=my-index
export DOCUMENT_TYPE=my-type
```
* Build and push image to Google Container Repository
```sh
mvn clean package -Dimage=${TARGET_GCR_IMAGE} \
-Dbase-container-image=${BASE_CONTAINER_IMAGE} \
-Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
-Dapp-root=${APP_ROOT} \
-Dcommand-spec=${COMMAND_SPEC}
```

#### Creating Image Spec

Create file in Cloud Storage with path to container image in Google Container Repository.
```json
{
"docker_template_spec": {
"docker_image": "gcr.io/project/my-image-name"
}
}
```

### Testing Template

The template unit tests can be run using:
```sh
mvn test
```

### Executing Template

The template requires the following parameters:
* inputTableSpec: Table in BigQuery to read from in form of: my-project:my-dataset.my-table. Either this or query must be provided.
* nodeAddresses: Comma separated list of Elasticsearch nodes to connect to, ex: http://my-node1,http://my-node2
* index: The index toward which the requests will be issued, ex: my-index
* documentType: The document type toward which the requests will be issued, ex: my-document-type
* useLegacySql: Set to true to use legacy SQL (only applicable if supplying query). Default: false

The template has the following optional parameters:
* query: Query to run against input table,
* For Standard SQL ex: 'SELECT max_temperature FROM \`clouddataflow-readonly.samples.weather_stations\`'
* For Legacy SQL ex: 'SELECT max_temperature FROM [clouddataflow-readonly:samples.weather_stations]'
* batchSize: Batch size in number of documents. Default: 1000
* batchSizeBytes: Batch size in number of bytes. Default: 5242880 (5mb)
* maxRetryAttempts: Max retry attempts, must be > 0. Default: no retries
* maxRetryDuration: Max retry duration in milliseconds, must be > 0. Default: no retries
* usePartialUpdates: Set to true to issue partial updates. Default: false
* idFnPath: Path to javascript file containing function to extract Id from document, ex: gs://path/to/idFn.js. Default: null
* idFnName: Name of javascript function to extract Id from document. Default: null
* indexFnPath: Path to javascript file containing function to extract Index from document, ex: gs://path/to/indexFn.js. Default: null
* indexFnName: Name of javascript function to extract Index from document. Default: null
* Will override index provided.
* typeFnPath: Path to javascript file containing function to extract Type from document, ex: gs://path/to/typeFn.js. Default: null
* typeFnName: Name of javascript function to extract Type from document. Default: null
* Will override type provided.

Template can be executed using the following API call:
```sh
API_ROOT_URL="https://dataflow.googleapis.com"
TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/templates:launch"
JOB_NAME="csv-to-elasticsearch-`date +%Y%m%d-%H%M%S-%N`"
time curl -X POST -H "Content-Type: application/json" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
"${TEMPLATES_LAUNCH_API}"`
`"?validateOnly=false"`
`"&dynamicTemplate.gcsPath=gs://path/to/image/spec"`
`"&dynamicTemplate.stagingLocation=gs://path/to/stagingLocation" \
-d '
{
"jobName":"'$JOB_NAME'",
"parameters": {
"inputTableSpec":"'$INPUT_TABLE_SPEC'",
"nodeAddresses":"'$NODE_ADDRESSES'",
"index":"'$INDEX'",
"documentType":"'$DOCUMENT_TYPE'"
}
}
'
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
63 changes: 63 additions & 0 deletions v2/bigquery-to-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2019 Google Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dynamic-templates</artifactId>
<groupId>com.google.cloud.teleport.v2</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>bigquery-to-elasticsearch</artifactId>

<properties>
<guava.version>20.0</guava.version>
</properties>

<dependencies>
<dependency>
<groupId>com.google.cloud.teleport.v2</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>

<build>
<directory>${mvn-target-dir}</directory>
<plugins>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (C) 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.v2.transforms.BigQueryConverters.BigQueryReadOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.ReadBigQuery;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.TableRowToJsonFn;
import com.google.cloud.teleport.v2.transforms.ElasticsearchTransforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.ElasticsearchTransforms.WriteToElasticsearchOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;

/**
* The {@link BigQueryToElasticsearch} pipeline exports data from a BigQuery table to Elasticsearch.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>BigQuery Table exists.
* <li>Elasticsearch node exists and is reachable by Dataflow workers.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
* TABLE={$PROJECT}:my-dataset.my-table
* NODE_ADDRESSES=comma-separated-list-nodes
* INDEX=my-index
* DOCUMENT_TYPE=my-type
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create an image spec in GCS that contains the path to the image
* {
* "docker_template_spec": {
* "docker_image": $TARGET_GCR_IMAGE
* }
* }
*
* # Execute template:
* API_ROOT_URL="https://dataflow.googleapis.com"
* TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/templates:launch"
* JOB_NAME="bigquery-to-elasticsearch-`date +%Y%m%d-%H%M%S-%N`"
*
* time curl -X POST -H "Content-Type: application/json" \
* -H "Authorization: Bearer $(gcloud auth print-access-token)" \
* "${TEMPLATES_LAUNCH_API}"`
* `"?validateOnly=false"`
* `"&dynamicTemplate.gcsPath=${BUCKET_NAME}/path/to/image-spec"`
* `"&dynamicTemplate.stagingLocation=${BUCKET_NAME}/staging" \
* -d '
* {
* "jobName":"'$JOB_NAME'",
* "parameters": {
* "inputTableSpec":"'$TABLE'",
* "nodeAddresses":"'$NODE_ADDRESSES'",
* "index":"'$INDEX'",
* "documentType":"'$DOCUMENT_TYPE'"
* }
* }
* '
* </pre>
*/
public class BigQueryToElasticsearch {

/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
BigQueryToElasticsearchReadOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(BigQueryToElasticsearchReadOptions.class);

run(options);
}

/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
private static PipelineResult run(BigQueryToElasticsearchReadOptions options) {

// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);

/*
* Steps: 1) Read records from BigQuery via BigQueryIO.
* 2) Create json string from Table Row.
* 3) Write records to Elasticsearch.
*
*
* Step #1: Read from BigQuery. If a query is provided then it is used to get the TableRows.
*/
pipeline
.apply(
"ReadFromBigQuery",
ReadBigQuery.newBuilder()
.setOptions(options.as(BigQueryToElasticsearchReadOptions.class))
.build())

/*
* Step #2: Convert table rows to JSON documents.
*/
.apply("TableRowsToJsonDocument", ParDo.of(new TableRowToJsonFn()))

/*
* Step #3: Write converted records to Elasticsearch
*/
.apply(
"WriteToElasticsearch",
WriteToElasticsearch.newBuilder()
.setOptions(options.as(WriteToElasticsearchOptions.class))
.build());

return pipeline.run();
}

/**
* The {@link BigQueryToElasticsearchReadOptions} class provides the custom execution options
* passed by the executor at the command-line.
*/
public interface BigQueryToElasticsearchReadOptions
extends PipelineOptions,
BigQueryReadOptions,
WriteToElasticsearchOptions {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Package for the BigQueryToElasticsearch template. */
package com.google.cloud.teleport.v2.templates;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"mainClass": "com.google.cloud.teleport.v2.templates.BigQueryToElasticsearch",
"classPath": "/template/BigQueryToElasticsearch/libs/*:/template/BigQueryToElasticsearch/classes"
}
Loading

0 comments on commit 289c5ba

Please sign in to comment.