diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 07346dc0c0c3..808f6b1db1d4 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -74,5 +74,6 @@ /talent/**/* @GoogleCloudPlatform/python-samples-reviewers /vision/**/* @GoogleCloudPlatform/python-samples-reviewers /workflows/**/* @GoogleCloudPlatform/python-samples-reviewers -/datacatalog/**/* @GoogleCloudPlatform/python-samples-reviewers +/datacatalog/**/* @GoogleCloudPlatform/python-samples-reviewers /kms/**/** @GoogleCloudPlatform/dee-infra @GoogleCloudPlatform/python-samples-reviewers +/dataproc/**/** @GoogleCloudPlatform/cloud-dpes @GoogleCloudPlatform/python-samples-reviewers \ No newline at end of file diff --git a/.github/blunderbuss.yml b/.github/blunderbuss.yml index ce3700dbaac7..c524caa7dca5 100644 --- a/.github/blunderbuss.yml +++ b/.github/blunderbuss.yml @@ -172,6 +172,10 @@ assign_prs_by: - 'api: cloudtasks' to: - GoogleCloudPlatform/infra-db-dpes +- labels: + - 'api: dataproc' + to: + - GoogleCloudPlatform/cloud-dpes assign_issues: - GoogleCloudPlatform/python-samples-owners diff --git a/dataproc/snippets/README.md b/dataproc/snippets/README.md new file mode 100644 index 000000000000..98622be7dc16 --- /dev/null +++ b/dataproc/snippets/README.md @@ -0,0 +1,84 @@ +# Cloud Dataproc API Examples + +[![Open in Cloud Shell][shell_img]][shell_link] + +[shell_img]: http://gstatic.com/cloudssh/images/open-btn.png +[shell_link]: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=dataproc/README.md + +Sample command-line programs for interacting with the Cloud Dataproc API. + +See [the tutorial on the using the Dataproc API with the Python client +library](https://cloud.google.com/dataproc/docs/tutorials/python-library-example) +for information on a walkthrough you can run to try out the Cloud Dataproc API sample code. + +Note that while this sample demonstrates interacting with Dataproc via the API, the functionality demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI. + +`list_clusters.py` is a simple command-line program to demonstrate connecting to the Cloud Dataproc API and listing the clusters in a region. + +`submit_job_to_cluster.py` demonstrates how to create a cluster, submit the +`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result. + +`single_job_workflow.py` uses the Cloud Dataproc InstantiateInlineWorkflowTemplate API to create an ephemeral cluster, run a job, then delete the cluster with one API request. + +`pyspark_sort.py_gcs` is the same as `pyspark_sort.py` but demonstrates + reading from a GCS bucket. + +## Prerequisites to run locally: + +* [pip](https://pypi.python.org/pypi/pip) + +Go to the [Google Cloud Console](https://console.cloud.google.com). + +Under API Manager, search for the Google Cloud Dataproc API and enable it. + +## Set Up Your Local Dev Environment + +To install, run the following commands. If you want to use [virtualenv](https://virtualenv.readthedocs.org/en/latest/) +(recommended), run the commands within a virtualenv. + + * pip install -r requirements.txt + +## Authentication + +Please see the [Google cloud authentication guide](https://cloud.google.com/docs/authentication/). +The recommended approach to running these samples is a Service Account with a JSON key. + +## Environment Variables + +Set the following environment variables: + + GOOGLE_CLOUD_PROJECT=your-project-id + REGION=us-central1 # or your region + CLUSTER_NAME=waprin-spark7 + ZONE=us-central1-b + +## Running the samples + +To run list_clusters.py: + + python list_clusters.py $GOOGLE_CLOUD_PROJECT --region=$REGION + +`submit_job_to_cluster.py` can create the Dataproc cluster or use an existing cluster. To create a cluster before running the code, you can use the [Cloud Console](console.cloud.google.com) or run: + + gcloud dataproc clusters create your-cluster-name + +To run submit_job_to_cluster.py, first create a GCS bucket (used by Cloud Dataproc to stage files) from the Cloud Console or with gsutil: + + gsutil mb gs:// + +Next, set the following environment variables: + + BUCKET=your-staging-bucket + CLUSTER=your-cluster-name + +Then, if you want to use an existing cluster, run: + + python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET + +Alternatively, to create a new cluster, which will be deleted at the end of the job, run: + + python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET --create_new_cluster + +The script will setup a cluster, upload the PySpark file, submit the job, print the result, then, if it created the cluster, delete the cluster. + +Optionally, you can add the `--pyspark_file` argument to change from the default `pyspark_sort.py` included in this script to a new script. diff --git a/dataproc/snippets/create_cluster.py b/dataproc/snippets/create_cluster.py new file mode 100644 index 000000000000..633b59e8cf4c --- /dev/null +++ b/dataproc/snippets/create_cluster.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through creating a Cloud Dataproc cluster using +# the Python client library. +# +# This script can be run on its own: +# python create_cluster.py ${PROJECT_ID} ${REGION} ${CLUSTER_NAME} + + +import sys + +# [START dataproc_create_cluster] +from google.cloud import dataproc_v1 as dataproc + + +def create_cluster(project_id, region, cluster_name): + """This sample walks a user through creating a Cloud Dataproc cluster + using the Python client library. + + Args: + project_id (string): Project to use for creating resources. + region (string): Region where the resources should live. + cluster_name (string): Name to use for creating a cluster. + """ + + # Create a client with the endpoint set to the desired cluster region. + cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} + ) + + # Create the cluster config. + cluster = { + "project_id": project_id, + "cluster_name": cluster_name, + "config": { + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + }, + } + + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": project_id, "region": region, "cluster": cluster} + ) + result = operation.result() + + # Output a success message. + print(f"Cluster created successfully: {result.cluster_name}") + # [END dataproc_create_cluster] + + +if __name__ == "__main__": + if len(sys.argv) < 4: + sys.exit("python create_cluster.py project_id region cluster_name") + + project_id = sys.argv[1] + region = sys.argv[2] + cluster_name = sys.argv[3] + create_cluster(project_id, region, cluster_name) diff --git a/dataproc/snippets/create_cluster_test.py b/dataproc/snippets/create_cluster_test.py new file mode 100644 index 000000000000..48941a1c5dbb --- /dev/null +++ b/dataproc/snippets/create_cluster_test.py @@ -0,0 +1,57 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from google.api_core.exceptions import NotFound +from google.cloud import dataproc_v1 as dataproc +import pytest + +import create_cluster + + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" +CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4())) + + +@pytest.fixture(autouse=True) +def teardown(): + yield + + cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"} + ) + # Client library function + try: + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + # Wait for cluster to delete + operation.result() + except NotFound: + print("Cluster already deleted") + + +def test_cluster_create(capsys): + # Wrapper function for client library function + create_cluster.create_cluster(PROJECT_ID, REGION, CLUSTER_NAME) + + out, _ = capsys.readouterr() + assert CLUSTER_NAME in out diff --git a/dataproc/snippets/dataproc_e2e_donttest.py b/dataproc/snippets/dataproc_e2e_donttest.py new file mode 100644 index 000000000000..462a8850d0c4 --- /dev/null +++ b/dataproc/snippets/dataproc_e2e_donttest.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Integration tests for Dataproc samples. + +Creates a Dataproc cluster, uploads a pyspark file to Google Cloud Storage, +submits a job to Dataproc that runs the pyspark file, then downloads +the output logs from Cloud Storage and verifies the expected output.""" + +import os + +import submit_job_to_cluster + +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +BUCKET = os.environ["CLOUD_STORAGE_BUCKET"] +CLUSTER_NAME = "testcluster3" +ZONE = "us-central1-b" + + +def test_e2e(): + output = submit_job_to_cluster.main(PROJECT, ZONE, CLUSTER_NAME, BUCKET) + assert b"['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output diff --git a/dataproc/snippets/instantiate_inline_workflow_template.py b/dataproc/snippets/instantiate_inline_workflow_template.py new file mode 100644 index 000000000000..cbb1a2186e2d --- /dev/null +++ b/dataproc/snippets/instantiate_inline_workflow_template.py @@ -0,0 +1,97 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through instantiating an inline +# workflow for Cloud Dataproc using the Python client library. +# +# This script can be run on its own: +# python instantiate_inline_workflow_template.py ${PROJECT_ID} ${REGION} + + +import sys + +# [START dataproc_instantiate_inline_workflow_template] +from google.cloud import dataproc_v1 as dataproc + + +def instantiate_inline_workflow_template(project_id, region): + """This sample walks a user through submitting a workflow + for a Cloud Dataproc using the Python client library. + + Args: + project_id (string): Project to use for running the workflow. + region (string): Region where the workflow resources should live. + """ + + # Create a client with the endpoint set to the desired region. + workflow_template_client = dataproc.WorkflowTemplateServiceClient( + client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} + ) + + parent = "projects/{}/regions/{}".format(project_id, region) + + template = { + "jobs": [ + { + "hadoop_job": { + "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/" + "hadoop-mapreduce-examples.jar", + "args": ["teragen", "1000", "hdfs:///gen/"], + }, + "step_id": "teragen", + }, + { + "hadoop_job": { + "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/" + "hadoop-mapreduce-examples.jar", + "args": ["terasort", "hdfs:///gen/", "hdfs:///sort/"], + }, + "step_id": "terasort", + "prerequisite_step_ids": ["teragen"], + }, + ], + "placement": { + "managed_cluster": { + "cluster_name": "my-managed-cluster", + "config": { + "gce_cluster_config": { + # Leave 'zone_uri' empty for 'Auto Zone Placement' + # 'zone_uri': '' + "zone_uri": "us-central1-a" + } + }, + } + }, + } + + # Submit the request to instantiate the workflow from an inline template. + operation = workflow_template_client.instantiate_inline_workflow_template( + request={"parent": parent, "template": template} + ) + operation.result() + + # Output a success message. + print("Workflow ran successfully.") + # [END dataproc_instantiate_inline_workflow_template] + + +if __name__ == "__main__": + if len(sys.argv) < 3: + sys.exit( + "python instantiate_inline_workflow_template.py " + "project_id region" + ) + + project_id = sys.argv[1] + region = sys.argv[2] + instantiate_inline_workflow_template(project_id, region) diff --git a/dataproc/snippets/instantiate_inline_workflow_template_test.py b/dataproc/snippets/instantiate_inline_workflow_template_test.py new file mode 100644 index 000000000000..ef4f31a564c9 --- /dev/null +++ b/dataproc/snippets/instantiate_inline_workflow_template_test.py @@ -0,0 +1,31 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import instantiate_inline_workflow_template + + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" + + +def test_workflows(capsys): + # Wrapper function for client library function + instantiate_inline_workflow_template.instantiate_inline_workflow_template( + PROJECT_ID, REGION + ) + + out, _ = capsys.readouterr() + assert "successfully" in out diff --git a/dataproc/snippets/list_clusters.py b/dataproc/snippets/list_clusters.py new file mode 100644 index 000000000000..0b36ba610096 --- /dev/null +++ b/dataproc/snippets/list_clusters.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Sample command-line program to list Cloud Dataproc clusters in a region. + +Example usage: +python list_clusters.py --project_id=my-project-id --region=global + +""" +import argparse + +from google.cloud import dataproc_v1 + + +# [START dataproc_list_clusters] +def list_clusters(dataproc, project, region): + """List the details of clusters in the region.""" + for cluster in dataproc.list_clusters( + request={"project_id": project, "region": region} + ): + print(("{} - {}".format(cluster.cluster_name, cluster.status.state.name))) + + +# [END dataproc_list_clusters] + + +def main(project_id, region): + + if region == "global": + # Use the default gRPC global endpoints. + dataproc_cluster_client = dataproc_v1.ClusterControllerClient() + else: + # Use a regional gRPC endpoint. See: + # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints + dataproc_cluster_client = dataproc_v1.ClusterControllerClient( + client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} + ) + + list_clusters(dataproc_cluster_client, project_id, region) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=(argparse.RawDescriptionHelpFormatter) + ) + parser.add_argument("--project_id", help="Project ID to access.", required=True) + parser.add_argument("--region", help="Region of clusters to list.", required=True) + + args = parser.parse_args() + main(args.project_id, args.region) diff --git a/dataproc/snippets/noxfile_config.py b/dataproc/snippets/noxfile_config.py new file mode 100644 index 000000000000..646d77de8543 --- /dev/null +++ b/dataproc/snippets/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7", "3.6"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + # "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + # "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + "gcloud_project_env": "BUILD_SPECIFIC_GCLOUD_PROJECT", + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/dataproc/snippets/pyspark_sort.py b/dataproc/snippets/pyspark_sort.py new file mode 100644 index 000000000000..cecba896164a --- /dev/null +++ b/dataproc/snippets/pyspark_sort.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Sample pyspark script to be uploaded to Cloud Storage and run on +Cloud Dataproc. + +Note this file is not intended to be run directly, but run inside a PySpark +environment. +""" + +# [START dataproc_pyspark_sort] +import pyspark + +sc = pyspark.SparkContext() +rdd = sc.parallelize(["Hello,", "world!", "dog", "elephant", "panther"]) +words = sorted(rdd.collect()) +print(words) +# [END dataproc_pyspark_sort] diff --git a/dataproc/snippets/pyspark_sort_gcs.py b/dataproc/snippets/pyspark_sort_gcs.py new file mode 100644 index 000000000000..1f07722e78b0 --- /dev/null +++ b/dataproc/snippets/pyspark_sort_gcs.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Sample pyspark script to be uploaded to Cloud Storage and run on +Cloud Dataproc. + +Note this file is not intended to be run directly, but run inside a PySpark +environment. + +This file demonstrates how to read from a GCS bucket. See README.md for more +information. +""" + +# [START dataproc_pyspark_sort_gcs] +import pyspark + +sc = pyspark.SparkContext() +rdd = sc.textFile("gs://path-to-your-GCS-file") +print(sorted(rdd.collect())) +# [END dataproc_pyspark_sort_gcs] diff --git a/dataproc/snippets/python-api-walkthrough.md b/dataproc/snippets/python-api-walkthrough.md new file mode 100644 index 000000000000..c5eb884a8f01 --- /dev/null +++ b/dataproc/snippets/python-api-walkthrough.md @@ -0,0 +1,157 @@ +# Use the Python Client Library to call Dataproc APIs + +Estimated completion time: + +## Overview + +This [Cloud Shell](https://cloud.google.com/shell/docs/) walkthrough leads you +through the steps to use the +[Cloud Client Libraries for Python](https://googleapis.github.io/google-cloud-python/latest/dataproc/index.html) +to programmatically interact with [Dataproc](https://cloud.google.com/dataproc/docs/). + +As you follow this walkthrough, you run Python code that calls +[Dataproc gRPC APIs](https://cloud.google.com/dataproc/docs/reference/rpc/) +to: + +* Create a Dataproc cluster +* Submit a PySpark word sort job to the cluster +* Delete the cluster after job completion + +## Using the walkthrough + +The `submit_job_to_cluster.py file` used in this walkthrough is opened in the +Cloud Shell editor when you launch the walkthrough. You can view +the code as your follow the walkthrough steps. + +**For more information**: See [Use the Cloud Client Libraries for Python](https://cloud.google.com/dataproc/docs/tutorials/python-library-example) for +an explanation of how the code works. + +**To reload this walkthrough:** Run the following command from the +`~/python-docs-samples/dataproc` directory in Cloud Shell: + + cloudshell launch-tutorial python-api-walkthrough.md + +**To copy and run commands**: Click the "Copy to Cloud Shell" button + () + on the side of a code box, then press `Enter` to run the command. + +## Prerequisites (1) + + + +1. Create or select a Google Cloud project to use for this + tutorial. + * + +1. Enable the Dataproc, Compute Engine, and Cloud Storage APIs in your + project. + + ```bash + gcloud services enable dataproc.googleapis.com \ + compute.googleapis.com \ + storage-component.googleapis.com \ + --project={{project_id}} + ``` + +## Prerequisites (2) + +1. This walkthrough uploads a PySpark file (`pyspark_sort.py`) to a + [Cloud Storage bucket](https://cloud.google.com/storage/docs/key-terms#buckets) in + your project. + * You can use the [Cloud Storage browser page](https://console.cloud.google.com/storage/browser) + in Google Cloud Console to view existing buckets in your project. + + **OR** + + * To create a new bucket, run the following command. Your bucket name must be unique. + + gsutil mb -p {{project-id}} gs://your-bucket-name + + +2. Set environment variables. + * Set the name of your bucket. + + BUCKET=your-bucket-name + +## Prerequisites (3) + +1. Set up a Python + [virtual environment](https://virtualenv.readthedocs.org/en/latest/). + + * Create the virtual environment. + + virtualenv ENV + + * Activate the virtual environment. + + source ENV/bin/activate + +1. Install library dependencies. + + pip install -r requirements.txt + +## Create a cluster and submit a job + +1. Set a name for your new cluster. + + CLUSTER=new-cluster-name + +1. Set a [region](https://cloud.google.com/compute/docs/regions-zones/#available) + where your new cluster will be located. You can change the pre-set + "us-central1" region beforew you copy and run the following command. + + REGION=us-central1 + +1. Run `submit_job_to_cluster.py` to create a new cluster and run the + `pyspark_sort.py` job on the cluster. + + python submit_job_to_cluster.py \ + --project_id={{project-id}} \ + --region=$REGION \ + --cluster_name=$CLUSTER \ + --gcs_bucket=$BUCKET + +## Job Output + +Job output displayed in the Cloud Shell terminaL shows cluster creation, +job completion, sorted job output, and then deletion of the cluster. + +```xml +Cluster created successfully: cliuster-name. +... +Job finished successfully. +... +['Hello,', 'dog', 'elephant', 'panther', 'world!'] +... +Cluster cluster-name successfully deleted. +``` + +## Congratulations on completing the Walkthrough! + + +--- + +### Next Steps: + +* **View job details in the Cloud Console.** View job details by selecting the + PySpark job name on the Dataproc + [Jobs page](https://console.cloud.google.com/dataproc/jobs) + in the Cloud console. + +* **Delete resources used in the walkthrough.** + The `submit_job_to_cluster.py` code deletes the cluster that it created for this + walkthrough. + + If you created a Cloud Storage bucket to use for this walkthrough, + you can run the following command to delete the bucket (the bucket must be empty). + + gsutil rb gs://$BUCKET + + * You can run the following command to **delete the bucket and all + objects within it. Note: the deleted objects cannot be recovered.** + + gsutil rm -r gs://$BUCKET + + +* **For more information.** See the [Dataproc documentation](https://cloud.google.com/dataproc/docs/) + for API reference and product feature information. diff --git a/dataproc/snippets/quickstart/quickstart.py b/dataproc/snippets/quickstart/quickstart.py new file mode 100644 index 000000000000..c9e73002665d --- /dev/null +++ b/dataproc/snippets/quickstart/quickstart.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START dataproc_quickstart] +""" +This quickstart sample walks a user through creating a Cloud Dataproc +cluster, submitting a PySpark job from Google Cloud Storage to the +cluster, reading the output of the job and deleting the cluster, all +using the Python client library. + +Usage: + python quickstart.py --project_id --region \ + --cluster_name --job_file_path +""" + +import argparse +import re + +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage + + +def quickstart(project_id, region, cluster_name, job_file_path): + # Create the cluster client. + cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} + ) + + # Create the cluster config. + cluster = { + "project_id": project_id, + "cluster_name": cluster_name, + "config": { + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + }, + } + + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": project_id, "region": region, "cluster": cluster} + ) + result = operation.result() + + print("Cluster created successfully: {}".format(result.cluster_name)) + + # Create the job client. + job_client = dataproc.JobControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} + ) + + # Create the job config. + job = { + "placement": {"cluster_name": cluster_name}, + "pyspark_job": {"main_python_file_uri": job_file_path}, + } + + operation = job_client.submit_job_as_operation( + request={"project_id": project_id, "region": region, "job": job} + ) + response = operation.result() + + # Dataproc job output gets saved to the Google Cloud Storage bucket + # allocated to the job. Use a regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_string() + ) + + print(f"Job finished successfully: {output}") + + # Delete the cluster once the job has terminated. + operation = cluster_client.delete_cluster( + request={ + "project_id": project_id, + "region": region, + "cluster_name": cluster_name, + } + ) + operation.result() + + print("Cluster {} successfully deleted.".format(cluster_name)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--project_id", + type=str, + required=True, + help="Project to use for creating resources.", + ) + parser.add_argument( + "--region", + type=str, + required=True, + help="Region where the resources should live.", + ) + parser.add_argument( + "--cluster_name", + type=str, + required=True, + help="Name to use for creating a cluster.", + ) + parser.add_argument( + "--job_file_path", + type=str, + required=True, + help="Job in GCS to execute against the cluster.", + ) + + args = parser.parse_args() + quickstart(args.project_id, args.region, args.cluster_name, args.job_file_path) +# [END dataproc_quickstart] diff --git a/dataproc/snippets/quickstart/quickstart_test.py b/dataproc/snippets/quickstart/quickstart_test.py new file mode 100644 index 000000000000..4020ad80e981 --- /dev/null +++ b/dataproc/snippets/quickstart/quickstart_test.py @@ -0,0 +1,90 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +import backoff +from google.api_core.exceptions import ServiceUnavailable +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage +import pytest + +import quickstart + + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" +CLUSTER_NAME = "py-qs-test-{}".format(str(uuid.uuid4())) +STAGING_BUCKET = "py-dataproc-qs-bucket-{}".format(str(uuid.uuid4())) +JOB_FILE_NAME = "sum.py" +JOB_FILE_PATH = "gs://{}/{}".format(STAGING_BUCKET, JOB_FILE_NAME) +SORT_CODE = ( + "import pyspark\n" + "sc = pyspark.SparkContext()\n" + "rdd = sc.parallelize((1,2,3,4,5))\n" + "sum = rdd.reduce(lambda x, y: x + y)\n" +) + + +@pytest.fixture(autouse=True) +def blob(): + storage_client = storage.Client() + + @backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) + def create_bucket(): + return storage_client.create_bucket(STAGING_BUCKET) + + bucket = create_bucket() + blob = bucket.blob(JOB_FILE_NAME) + blob.upload_from_string(SORT_CODE) + + yield + + blob.delete() + bucket.delete() + + +@pytest.fixture(autouse=True) +def cluster(): + yield + + # The quickstart sample deletes the cluster, but if the test fails + # before cluster deletion occurs, it can be manually deleted here. + cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)} + ) + + clusters = cluster_client.list_clusters( + request={"project_id": PROJECT_ID, "region": REGION} + ) + + for cluster in clusters: + if cluster.cluster_name == CLUSTER_NAME: + cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + + +def test_quickstart(capsys): + quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH) + out, _ = capsys.readouterr() + + assert "Cluster created successfully" in out + assert "Job finished successfully" in out + assert "successfully deleted" in out diff --git a/dataproc/snippets/requirements-test.txt b/dataproc/snippets/requirements-test.txt new file mode 100644 index 000000000000..0c53ad2e6787 --- /dev/null +++ b/dataproc/snippets/requirements-test.txt @@ -0,0 +1,2 @@ +pytest==7.2.0 +pytest-xdist==3.0.2 \ No newline at end of file diff --git a/dataproc/snippets/requirements.txt b/dataproc/snippets/requirements.txt new file mode 100644 index 000000000000..9414a8d384a9 --- /dev/null +++ b/dataproc/snippets/requirements.txt @@ -0,0 +1,8 @@ + +backoff==2.2.1 +grpcio==1.50.0 +google-auth==2.14.0 +google-auth-httplib2==0.1.0 +google-cloud==0.34.0 +google-cloud-storage==2.5.0 +google-cloud-dataproc==5.0.3 diff --git a/dataproc/snippets/single_job_workflow.py b/dataproc/snippets/single_job_workflow.py new file mode 100644 index 000000000000..6f9948f04f85 --- /dev/null +++ b/dataproc/snippets/single_job_workflow.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +r"""Sample Cloud Dataproc inline workflow to run a pyspark job on an ephermeral +cluster. +Example Usage to run the inline workflow on a managed cluster: +python single_job_workflow.py --project_id=$PROJECT --gcs_bucket=$BUCKET \ + --cluster_name=$CLUSTER --zone=$ZONE +Example Usage to run the inline workflow on a global region managed cluster: +python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \ + --cluster_name=$CLUSTER --zone=$ZONE --global_region +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import os + +from google.cloud import dataproc_v1 +from google.cloud import storage +from google.cloud.dataproc_v1.gapic.transports import ( + workflow_template_service_grpc_transport, +) + + +DEFAULT_FILENAME = "pyspark_sort.py" +waiting_callback = False + + +def get_pyspark_file(pyspark_file=None): + if pyspark_file: + f = open(pyspark_file, "rb") + return f, os.path.basename(pyspark_file) + else: + """Gets the PySpark file from current directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb") + return f, DEFAULT_FILENAME + + +def get_region_from_zone(zone): + try: + region_as_list = zone.split("-")[:-1] + return "-".join(region_as_list) + except (AttributeError, IndexError, ValueError): + raise ValueError("Invalid zone provided, please check your input.") + + +def upload_pyspark_file(project, bucket_name, filename, spark_file): + """Uploads the PySpark file in this directory to the configured input + bucket.""" + print("Uploading pyspark file to Cloud Storage.") + client = storage.Client(project=project) + bucket = client.get_bucket(bucket_name) + blob = bucket.blob(filename) + blob.upload_from_file(spark_file) + + +def run_workflow(dataproc, project, region, zone, bucket_name, filename, cluster_name): + + parent = "projects/{}/regions/{}".format(project, region) + zone_uri = "https://www.googleapis.com/compute/v1/projects/{}/zones/{}".format( + project, zone + ) + + workflow_data = { + "placement": { + "managed_cluster": { + "cluster_name": cluster_name, + "config": { + "gce_cluster_config": {"zone_uri": zone_uri}, + "master_config": { + "num_instances": 1, + "machine_type_uri": "n1-standard-1", + }, + "worker_config": { + "num_instances": 2, + "machine_type_uri": "n1-standard-1", + }, + }, + } + }, + "jobs": [ + { + "pyspark_job": { + "main_python_file_uri": "gs://{}/{}".format(bucket_name, filename) + }, + "step_id": "pyspark-job", + } + ], + } + + workflow = dataproc.instantiate_inline_workflow_template( + request={"parent": parent, "template": workflow_data} + ) + + workflow.add_done_callback(callback) + global waiting_callback + waiting_callback = True + + +def callback(operation_future): + # Reset global when callback returns. + global waiting_callback + waiting_callback = False + + +def wait_for_workflow_end(): + """Wait for cluster creation.""" + print("Waiting for workflow completion ...") + print( + "Workflow and job progress, and job driver output available from: " + "https://console.cloud.google.com/dataproc/workflows/" + ) + + while True: + if not waiting_callback: + print("Workflow completed.") + break + + +def main( + project_id, + zone, + cluster_name, + bucket_name, + pyspark_file=None, + create_new_cluster=True, + global_region=True, +): + + # [START dataproc_get_workflow_template_client] + if global_region: + region = "global" + # Use the default gRPC global endpoints. + dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient() + else: + region = get_region_from_zone(zone) + # Use a regional gRPC endpoint. See: + # https://cloud.google.com/dataproc/docs/concepts/regional-endpoints + client_transport = workflow_template_service_grpc_transport.WorkflowTemplateServiceGrpcTransport( + address="{}-dataproc.googleapis.com:443".format(region) + ) + dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient( + client_transport + ) + # [END dataproc_get_workflow_template_client] + + try: + spark_file, spark_filename = get_pyspark_file(pyspark_file) + upload_pyspark_file(project_id, bucket_name, spark_filename, spark_file) + + run_workflow( + dataproc_workflow_client, + project_id, + region, + zone, + bucket_name, + spark_filename, + cluster_name, + ) + wait_for_workflow_end() + + finally: + spark_file.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=(argparse.RawDescriptionHelpFormatter) + ) + parser.add_argument( + "--project_id", help="Project ID you want to access.", required=True + ) + parser.add_argument( + "--zone", help="Zone to create clusters in/connect to", required=True + ) + parser.add_argument( + "--cluster_name", help="Name of the cluster to create/connect to", required=True + ) + parser.add_argument( + "--gcs_bucket", help="Bucket to upload Pyspark file to", required=True + ) + parser.add_argument( + "--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py" + ) + parser.add_argument( + "--global_region", + action="store_true", + help="If cluster is in the global region", + ) + + args = parser.parse_args() + main( + args.project_id, + args.zone, + args.cluster_name, + args.gcs_bucket, + args.pyspark_file, + ) diff --git a/dataproc/snippets/submit_job.py b/dataproc/snippets/submit_job.py new file mode 100644 index 000000000000..d7761b734c8c --- /dev/null +++ b/dataproc/snippets/submit_job.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python + +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through submitting a Spark job using the Dataproc +# client library. + +# Usage: +# python submit_job.py --project_id --region \ +# --cluster_name + +# [START dataproc_submit_job] +import re + +# [END dataproc_submit_job] +import sys + +# [START dataproc_submit_job] + +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage + + +def submit_job(project_id, region, cluster_name): + # Create the job client. + job_client = dataproc.JobControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} + ) + + # Create the job config. 'main_jar_file_uri' can also be a + # Google Cloud Storage URL. + job = { + "placement": {"cluster_name": cluster_name}, + "spark_job": { + "main_class": "org.apache.spark.examples.SparkPi", + "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"], + "args": ["1000"], + }, + } + + operation = job_client.submit_job_as_operation( + request={"project_id": project_id, "region": region, "job": job} + ) + response = operation.result() + + # Dataproc job output gets saved to the Google Cloud Storage bucket + # allocated to the job. Use a regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_string() + ) + + print(f"Job finished successfully: {output}") + + +# [END dataproc_submit_job] + + +if __name__ == "__main__": + if len(sys.argv) < 3: + sys.exit("python submit_job.py project_id region cluster_name") + + project_id = sys.argv[1] + region = sys.argv[2] + cluster_name = sys.argv[3] + submit_job(project_id, region, cluster_name) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py new file mode 100644 index 000000000000..c2e9bed9e7f8 --- /dev/null +++ b/dataproc/snippets/submit_job_test.py @@ -0,0 +1,74 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from google.api_core.exceptions import NotFound +from google.cloud import dataproc_v1 as dataproc +import pytest + +import submit_job + + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" +CLUSTER_NAME = "py-sj-test-{}".format(str(uuid.uuid4())) +CLUSTER = { + "project_id": PROJECT_ID, + "cluster_name": CLUSTER_NAME, + "config": { + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + }, +} + + +@pytest.fixture(autouse=True) +def setup_teardown(): + try: + cluster_client = dataproc.ClusterControllerClient( + client_options={ + "api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION) + } + ) + + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + + yield + + finally: + try: + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + operation.result() + + except NotFound: + print("Cluster already deleted") + + +def test_submit_job(capsys): + submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME) + out, _ = capsys.readouterr() + + assert "Job finished successfully" in out diff --git a/dataproc/snippets/submit_job_to_cluster.py b/dataproc/snippets/submit_job_to_cluster.py new file mode 100644 index 000000000000..506174df825f --- /dev/null +++ b/dataproc/snippets/submit_job_to_cluster.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START dataproc_quickstart] +""" +Command-line program to create a Dataproc cluster, +run a PySpark job located in Cloud Storage on the cluster, +then delete the cluster after the job completes. + +Usage: + python submit_job_to_cluster --project_id --region \ + --cluster_name --job_file_path +""" + +import argparse +import os +import re + +from google.cloud import dataproc_v1 +from google.cloud import storage + +DEFAULT_FILENAME = "pyspark_sort.py" +waiting_callback = False + + +def get_pyspark_file(pyspark_file=None): + if pyspark_file: + f = open(pyspark_file, "rb") + return f, os.path.basename(pyspark_file) + else: + """Gets the PySpark file from current directory.""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb") + return f, DEFAULT_FILENAME + + +def get_region_from_zone(zone): + try: + region_as_list = zone.split("-")[:-1] + return "-".join(region_as_list) + except (AttributeError, IndexError, ValueError): + raise ValueError("Invalid zone provided, please check your input.") + + +def upload_pyspark_file(project, bucket_name, filename, spark_file): + """Uploads the PySpark file in this directory to the configured input + bucket.""" + print("Uploading pyspark file to Cloud Storage.") + client = storage.Client(project=project) + bucket = client.get_bucket(bucket_name) + blob = bucket.blob(filename) + blob.upload_from_file(spark_file) + + +def download_output(project, cluster_id, output_bucket, job_id): + """Downloads the output file from Cloud Storage and returns it as a + string.""" + print("Downloading output file.") + client = storage.Client(project=project) + bucket = client.get_bucket(output_bucket) + output_blob = "google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000".format( + cluster_id, job_id + ) + return bucket.blob(output_blob).download_as_string() + + +# [START dataproc_create_cluster] +def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file): + # Create the cluster client. + cluster_client = dataproc_v1.ClusterControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} + ) + + # Create the cluster config. + cluster = { + "project_id": project_id, + "cluster_name": cluster_name, + "config": { + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + }, + } + + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": project_id, "region": region, "cluster": cluster} + ) + result = operation.result() + + print("Cluster created successfully: {}".format(result.cluster_name)) + +# [END dataproc_create_cluster] + + spark_file, spark_filename = get_pyspark_file(pyspark_file) + upload_pyspark_file(project_id, gcs_bucket, spark_filename, spark_file) + +# [START dataproc_submit_job] + # Create the job client. + job_client = dataproc_v1.JobControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)} + ) + + # Create the job config. + job = { + "placement": {"cluster_name": cluster_name}, + "pyspark_job": {"main_python_file_uri": "gs://{}/{}".format(gcs_bucket, spark_filename)}, + } + + operation = job_client.submit_job_as_operation( + request={"project_id": project_id, "region": region, "job": job} + ) + response = operation.result() + + # Dataproc job output is saved to the Cloud Storage bucket + # allocated to the job. Use regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_string() + ) + + print(f"Job finished successfully: {output}\r\n") + # [END dataproc_submit_job] + + # [START dataproc_delete_cluster] + # Delete the cluster once the job has terminated. + operation = cluster_client.delete_cluster( + request={ + "project_id": project_id, + "region": region, + "cluster_name": cluster_name, + } + ) + operation.result() + + print("Cluster {} successfully deleted.".format(cluster_name)) +# [END dataproc_delete_cluster] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--project_id", + type=str, + required=True, + help="Project to use for creating resources.", + ) + parser.add_argument( + "--region", + type=str, + required=True, + help="Region where the resources should live.", + ) + parser.add_argument( + "--cluster_name", + type=str, + required=True, + help="Name to use for creating a cluster.", + ) + + parser.add_argument( + "--gcs_bucket", help="Bucket to upload Pyspark file to", required=True + ) + + parser.add_argument( + "--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py" + ) + + args = parser.parse_args() + quickstart(args.project_id, args.region, args.cluster_name, args.gcs_bucket, args.pyspark_file) +# [END dataproc_quickstart] diff --git a/dataproc/snippets/update_cluster.py b/dataproc/snippets/update_cluster.py new file mode 100644 index 000000000000..bae6eee2fd64 --- /dev/null +++ b/dataproc/snippets/update_cluster.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through updating the number of clusters using the Dataproc +# client library. + +# Usage: +# python update_cluster.py --project_id --region --cluster_name + +import sys + +# [START dataproc_update_cluster] +from google.cloud import dataproc_v1 as dataproc + + +def update_cluster(project_id, region, cluster_name, new_num_instances): + """This sample walks a user through updating a Cloud Dataproc cluster + using the Python client library. + + Args: + project_id (str): Project to use for creating resources. + region (str): Region where the resources should live. + cluster_name (str): Name to use for creating a cluster. + """ + + # Create a client with the endpoint set to the desired cluster region. + client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} + ) + + # Get cluster you wish to update. + cluster = client.get_cluster( + project_id=project_id, region=region, cluster_name=cluster_name + ) + + # Update number of clusters + mask = {"paths": {"config.worker_config.num_instances": str(new_num_instances)}} + + # Update cluster config + cluster.config.worker_config.num_instances = new_num_instances + + # Update cluster + operation = client.update_cluster( + project_id=project_id, + region=region, + cluster=cluster, + cluster_name=cluster_name, + update_mask=mask, + ) + + # Output a success message. + updated_cluster = operation.result() + print(f"Cluster was updated successfully: {updated_cluster.cluster_name}") + + +# [END dataproc_update_cluster] + + +if __name__ == "__main__": + if len(sys.argv) < 5: + sys.exit("python update_cluster.py project_id region cluster_name") + + project_id = sys.argv[1] + region = sys.argv[2] + cluster_name = sys.argv[3] + new_num_instances = sys.argv[4] + update_cluster(project_id, region, cluster_name) diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py new file mode 100644 index 000000000000..9a9d66c97978 --- /dev/null +++ b/dataproc/snippets/update_cluster_test.py @@ -0,0 +1,86 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through updating the number of clusters using the Dataproc +# client library. + + +import os +import uuid + +from google.api_core.exceptions import NotFound +from google.cloud.dataproc_v1.services.cluster_controller.client import ( + ClusterControllerClient, +) +import pytest + +import update_cluster + + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" +CLUSTER_NAME = f"py-cc-test-{str(uuid.uuid4())}" +NEW_NUM_INSTANCES = 5 +CLUSTER = { + "project_id": PROJECT_ID, + "cluster_name": CLUSTER_NAME, + "config": { + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"}, + "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"}, + }, +} + + +@pytest.fixture +def cluster_client(): + cluster_client = ClusterControllerClient( + client_options={"api_endpoint": "{}-dataproc.googleapis.com:443".format(REGION)} + ) + return cluster_client + + +@pytest.fixture(autouse=True) +def setup_teardown(cluster_client): + try: + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + + yield + finally: + try: + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + operation.result() + except NotFound: + print("Cluster already deleted") + + +def test_update_cluster(capsys, cluster_client: ClusterControllerClient): + # Wrapper function for client library function + update_cluster.update_cluster(PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES) + new_num_cluster = cluster_client.get_cluster( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + + out, _ = capsys.readouterr() + assert CLUSTER_NAME in out + assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES