Skip to content
This repository has been archived by the owner on Aug 10, 2023. It is now read-only.

Commit

Permalink
tutorial to trigger dataflow jobs using cloud scheduler (#1396)
Browse files Browse the repository at this point in the history
* tutorial to trigger dataflow jobs using cloud scheduler

* format the tutorial to fix circle ci checks

* change the title format

* address comments

* Templating and step-by-step instructions

* Enable APIs

* Add template compilation

* add the architecture diagram

* rename the build script

* address comments

* minor fixes

* address comments

* add cloudbuild sa setup

* add project iam admin role

* add dummy logic for dataflow job

* update sa setup

* first edit pass during readthrough

* second edit pass

Co-authored-by: zhong <zhongchen@google.com>
Co-authored-by: Todd Kopriva <43478937+ToddKopriva@users.noreply.github.com>
Co-authored-by: Jani Patokallio <jani@google.com>
  • Loading branch information
4 people committed Aug 31, 2020
1 parent 6503756 commit 7249510
Show file tree
Hide file tree
Showing 14 changed files with 444 additions and 0 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
149 changes: 149 additions & 0 deletions tutorials/schedule-dataflow-jobs-with-cloud-scheduler/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
---
title: Schedule Dataflow batch jobs with Cloud Scheduler
description: Learn how to set up Cloud Scheduler to trigger your Dataflow batch jobs.
author: zhongchen
tags: Cloud Dataflow, Cloud Scheduler
date_published: 2020-08-31
---

[Cloud Dataflow](https://cloud.google.com/dataflow) is a managed service for handling
streaming jobs and batch jobs. You can typically launch a streaming job and not worry about operating it afterwards.
However, for your batch jobs, you often need to trigger them based on certain conditions.

In this tutorial, you learn how to set up a [Cloud Scheduler](https://cloud.google.com/scheduler/) job to trigger to
Dataflow batch jobs.

![high-level architecture diagram](https://storage.googleapis.com/gcp-community/tutorials/schedule-dataflow-jobs-with-cloud-scheduler/scheduler-dataflow-diagram.png)

You can find the code for this tutorial in the
[associated GitHub repository](https://github.com/GoogleCloudPlatform/community/blob/master/tutorials/schedule-dataflow-jobs-with-cloud-scheduler/scheduler-dataflow-demo).

## Dataflow templates

To be able to run your Dataflow jobs on a regular basis, you need to build your Dataflow templates.

Follow the [instructions in the Dataflow documentation](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates) to create your templates and
save them in a Cloud Storage bucket.

![Upload Dataflow templates in a Cloud Storage bucket](https://storage.googleapis.com/gcp-community/tutorials/schedule-dataflow-jobs-with-cloud-scheduler/store_a_template_in_gcs.png)

## Cloud Schedule jobs

When you have your templates ready, you can set up Cloud Scheduler jobs to trigger Dataflow templates.

Here's one example that defines a Cloud Scheduler job using Terraform:

```hcl-terraform
resource "google_cloud_scheduler_job" "scheduler" {
name = "scheduler-demo"
schedule = "0 0 * * *"
# This needs to be us-central1 even if App Engine is in us-central.
# You will get a resource not found error if just using us-central.
region = "us-central1"
http_target {
http_method = "POST"
uri = "https://dataflow.googleapis.com/v1b3/projects/${var.project_id}/locations/${var.region}/templates:launch?gcsPath=gs://${var.bucket}/templates/dataflow-demo-template"
oauth_token {
service_account_email = google_service_account.cloud-scheduler-demo.email
}
# need to encode the string
body = base64encode(<<-EOT
{
"jobName": "test-cloud-scheduler",
"parameters": {
"region": "${var.region}",
"autoscalingAlgorithm": "THROUGHPUT_BASED",
},
"environment": {
"maxWorkers": "10",
"tempLocation": "gs://${var.bucket}/temp",
"zone": "${var.region}-a"
}
}
EOT
)
}
}
```

Cloud Scheduler jobs need to be created in the region in which you have set up App engine. In your
[Terraform script](https://www.terraform.io/docs/providers/google/r/cloud_scheduler_job.html#region), be sure to assign the right value for the region field.
You need to use `us-central1` if you have set up App Engine in `us-central`.

Use the [regional endpoint](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/create) to specify the region of the Dataflow
job. If you don't explicitly set the location in the request, the jobs will be created in the default region (us-central).

## Create a Dataflow pipeline with Cloud Build

Follow these instructions to create a sample Dataflow pipeline with Cloud Build.

1. Open Cloud Shell and clone the repository:

git clone https://github.com/GoogleCloudPlatform/community
cd community/tutorials/schedule-dataflow-jobs-with-cloud-scheduler/scheduler-dataflow-demo/

1. Create a bucket in Cloud Storage, which will store Terraform states and Dataflow templates:

export BUCKET=[YOUR_BUCKET_NAME]
gsutil mb -p ${GOOGLE_CLOUD_PROJECT} gs://${BUCKET}
Replace `[YOUR_BUCKET_NAME]` with your own choice. `${GOOGLE_CLOUD_PROJECT}` is predefined in Cloud Shell for the project ID.

1. Create a backend for Terraform to store the states of Google Cloud resources:

cd terraform
cat > backend.tf << EOF
terraform {
backend "gcs" {
bucket = "${BUCKET}"
prefix = "terraform/state"
}
}
EOF

1. Follow [these instructions](https://cloud.google.com/scheduler/docs/quickstart) to set up App Engine, which is needed to
set up Cloud Scheduler jobs.

Cloud Scheduler jobs must be created in the same region as App engine.

1. Set the region:

export REGION=us-central1

You need to set the region to be `us-central1`, even though the region is shown as `us-central` in some parts of the Cloud Console interface.

![App Engine location](https://storage.googleapis.com/gcp-community/tutorials/schedule-dataflow-jobs-with-cloud-scheduler/app_engine_location.png)


1. Follow
[these instructions](https://cloud.google.com/cloud-build/docs/securing-builds/configure-access-for-cloud-build-service-account#granting_a_role_using_the_iam_page)
to give the Cloud Build service account the following roles:

- Cloud Scheduler Admin
- Service Account Admin
- Service Account User
- Project IAM Admin

Verify in Cloud Console that all the roles are enabled.

![Cloud Build_status](https://storage.googleapis.com/gcp-community/tutorials/schedule-dataflow-jobs-with-cloud-scheduler/cloudbuild_sa_setup.png)

1. Submit a Cloud Build job to create the resources:

cd ..
gcloud builds submit --config=cloudbuild.yaml \
--substitutions=_BUCKET=${BUCKET},_REGION=${REGION},_PROJECT_ID=${GOOGLE_CLOUD_PROJECT} .

The job will run based on the schedule you defined in the Terraform script.

You can manually run the Cloud Scheduler job by using the Cloud Console interface and watch it trigger your Dataflow batch job.

You can check the status of jobs in the Cloud Console.

![See the status of your jobs](https://storage.googleapis.com/gcp-community/tutorials/schedule-dataflow-jobs-with-cloud-scheduler/check_scheduler_status.png)

## Cleaning up

Because this tutorial uses multiple Google Cloud components, be sure to delete the associated resources when you are done.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Cloud Scheduler & Dataflow Demo

The folder contains an example how to set up a cloud scheduler to trigger a Dataflow batch job.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# The PROJECT_ID is a cloud build env variable
steps:
- id: 'Terraform init'
name: 'hashicorp/terraform:0.12.20'
dir: 'terraform'
entrypoint: 'sh'
args:
- '-c'
- |
terraform init -input=false
terraform workspace select scheduler-dataflow-demo || terraform workspace new scheduler-dataflow-demo
terraform apply -input=false -var=project_id=${_PROJECT_ID} -var=region=${_REGION}\
-var=bucket=${_BUCKET} -auto-approve
waitFor: ['-']

- id: "Build dataflow template"
name: maven:3.6.0-jdk-11-slim
dir: 'dataflow'
env:
- "PROJECT=${_PROJECT_ID}"
- "BUCKET=${_BUCKET}"
- "REGION=${_REGION}"
entrypoint: 'bash'
args:
- '-c'
- |
./build.sh
waitFor: ['Terraform init']

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

mvn compile exec:java \
-Dexec.mainClass=DataflowDemoPipeline \
-Dexec.args="--runner=DataflowRunner \
--project=${PROJECT} \
--stagingLocation=gs://${BUCKET}/staging \
--gcpTempLocation=gs://${BUCKET}/temp \
--region=${REGION} \
--templateLocation=gs://${BUCKET}/templates/dataflow-demo-template"
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>dataflow</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>


<properties>
<beam.version>2.15.0</beam.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
<slf4j.version>1.7.25</slf4j.version>
<jackson.version>2.8.8</jackson.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>org.apache.streaming.WordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>

<profiles>

<profile>
<id>direct-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<!-- Makes the DirectRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>dataflow-runner</id>
<!-- Makes the DataflowRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>

</profiles>

<dependencies>
<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>

<!-- The DirectRunner is needed for unit tests. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>

<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>

<!-- Add slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

public class DataflowDemoPipeline {
private static final Logger Log = LoggerFactory.getLogger(DataflowDemoPipeline.class);

public static void main(String[] args) {
// Register Options class for our pipeline with the factory
PipelineOptionsFactory.register(DemoPipelineOptions.class);

DemoPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DemoPipelineOptions.class);

Pipeline p = Pipeline.create(options);

String[] numbers = new String[]{"one", "two", "three"};
List<String> input = Arrays.asList(numbers);
p.apply("dummy input", Create.of(input)).
apply("dummy transformation", ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext context) {
String elem = context.element();
Log.info("Process element: " + elem);
context.output(elem);
}
}
));
PDone.in(p);
p.run();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ValueProvider;

public interface DemoPipelineOptions extends DataflowPipelineOptions {
@Description("Subscription name")
@Default.String("demo_subscription")
ValueProvider<String> getSubscription();
void setSubscription(ValueProvider<String> subscription);
}
Loading

0 comments on commit 7249510

Please sign in to comment.