Skip to content

Commit

Permalink
Update Dataproc samples. (#2158)
Browse files Browse the repository at this point in the history
* Update requirements.txt

* Update python-api-walkthrough.md

* Update submit_job_to_cluster.py

* Update list_clusters.py
  • Loading branch information
aman-ebay authored and tswast committed May 20, 2019
1 parent ff8f235 commit 1bddafd
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 194 deletions.
38 changes: 16 additions & 22 deletions dataproc/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Cloud Dataproc API Example
# Cloud Dataproc API Examples

[![Open in Cloud Shell][shell_img]][shell_link]

Expand All @@ -7,21 +7,20 @@

Sample command-line programs for interacting with the Cloud Dataproc API.


Please see [the tutorial on the using the Dataproc API with the Python client
See [the tutorial on the using the Dataproc API with the Python client
library](https://cloud.google.com/dataproc/docs/tutorials/python-library-example)
for more information.
for information on a walkthrough you can run to try out the Cloud Dataproc API sample code.

Note that while this sample demonstrates interacting with Dataproc via the API, the functionality
demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.
Note that while this sample demonstrates interacting with Dataproc via the API, the functionality demonstrated here could also be accomplished using the Cloud Console or the gcloud CLI.

`list_clusters.py` is a simple command-line program to demonstrate connecting to the
Dataproc API and listing the clusters in a region
`list_clusters.py` is a simple command-line program to demonstrate connecting to the Cloud Dataproc API and listing the clusters in a region.

`create_cluster_and_submit_job.py` demonstrates how to create a cluster, submit the
`submit_job_to_cluster.py` demonstrates how to create a cluster, submit the
`pyspark_sort.py` job, download the output from Google Cloud Storage, and output the result.

`pyspark_sort.py_gcs` is the asme as `pyspark_sort.py` but demonstrates
`single_job_workflow.py` uses the Cloud Dataproc InstantiateInlineWorkflowTemplate API to create an ephemeral cluster, run a job, then delete the cluster with one API request.

`pyspark_sort.py_gcs` is the same as `pyspark_sort.py` but demonstrates
reading from a GCS bucket.

## Prerequisites to run locally:
Expand Down Expand Up @@ -59,32 +58,27 @@ To run list_clusters.py:

python list_clusters.py $GOOGLE_CLOUD_PROJECT --region=$REGION

`submit_job_to_cluster.py` can create the Dataproc cluster, or use an existing one.
If you'd like to create a cluster ahead of time, either use the
[Cloud Console](console.cloud.google.com) or run:
`submit_job_to_cluster.py` can create the Dataproc cluster or use an existing cluster. To create a cluster before running the code, you can use the [Cloud Console](console.cloud.google.com) or run:

gcloud dataproc clusters create your-cluster-name

To run submit_job_to_cluster.py, first create a GCS bucket for Dataproc to stage files, from the Cloud Console or with
gsutil:
To run submit_job_to_cluster.py, first create a GCS bucket (used by Cloud Dataproc to stage files) from the Cloud Console or with gsutil:

gsutil mb gs://<your-staging-bucket-name>

Set the environment variable's name:
Next, set the following environment variables:

BUCKET=your-staging-bucket
CLUSTER=your-cluster-name

Then, if you want to rely on an existing cluster, run:
Then, if you want to use an existing cluster, run:

python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET

Otherwise, if you want the script to create a new cluster for you:
Alternatively, to create a new cluster, which will be deleted at the end of the job, run:

python submit_job_to_cluster.py --project_id=$GOOGLE_CLOUD_PROJECT --zone=us-central1-b --cluster_name=$CLUSTER --gcs_bucket=$BUCKET --create_new_cluster

This will setup a cluster, upload the PySpark file, submit the job, print the result, then
delete the cluster.
The script will setup a cluster, upload the PySpark file, submit the job, print the result, then, if it created the cluster, delete the cluster.

You can optionally specify a `--pyspark_file` argument to change from the default
`pyspark_sort.py` included in this script to a new script.
Optionally, you can add the `--pyspark_file` argument to change from the default `pyspark_sort.py` included in this script to a new script.
53 changes: 29 additions & 24 deletions dataproc/list_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,54 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample command-line program to list Cloud Dataproc clusters in a region.
""" Sample command-line program for listing Google Dataproc Clusters
"""
Example usage:
python list_clusters.py --project_id=my-project-id --region=global
"""
import argparse

import googleapiclient.discovery
from google.cloud import dataproc_v1
from google.cloud.dataproc_v1.gapic.transports import (
cluster_controller_grpc_transport)


# [START dataproc_list_clusters]
def list_clusters(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=region).execute()
return result
"""List the details of clusters in the region."""
for cluster in dataproc.list_clusters(project, region):
print(('{} - {}'.format(cluster.cluster_name,
cluster.status.State.Name(
cluster.status.state))))
# [END dataproc_list_clusters]


# [START dataproc_get_client]
def get_client():
"""Builds a client to the dataproc API."""
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
return dataproc
# [END dataproc_get_client]
def main(project_id, region):

if region == 'global':
# Use the default gRPC global endpoints.
dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
else:
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = (
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address='{}-dataproc.googleapis.com:443'.format(region)))
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
client_transport)

def main(project_id, region):
dataproc = get_client()
result = list_clusters(dataproc, project_id, region)
print(result)
list_clusters(dataproc_cluster_client, project_id, region)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
description=__doc__, formatter_class=(
argparse.RawDescriptionHelpFormatter))
parser.add_argument(
'project_id', help='Project ID you want to access.'),
# Sets the region to "global" if it's not provided
# Note: sub-regions (e.g.: us-central1-a/b) are currently not supported
'--project_id', help='Project ID to access.', required=True)
parser.add_argument(
'--region', default='global', help='Region to list clusters')
'--region', help='Region of clusters to list.', required=True)

args = parser.parse_args()
main(args.project_id, args.region)
6 changes: 3 additions & 3 deletions dataproc/python-api-walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Job output in Cloud Shell shows cluster creation, job submission,
...
Creating cluster...
Cluster created.
Uploading pyspark file to GCS
Uploading pyspark file to Cloud Storage
new-cluster-name - RUNNING
Submitted job ID ...
Waiting for job to finish...
Expand All @@ -140,12 +140,12 @@ Job output in Cloud Shell shows cluster creation, job submission,
### Next Steps:
* **View job details from the Console.** View job details by selecting the
PySpark job from the Cloud Dataproc
PySpark job from the Cloud Dataproc
[Jobs page](https://console.cloud.google.com/dataproc/jobs)
in the Google Cloud Platform Console.
* **Delete resources used in the walkthrough.**
The `submit_job.py` job deletes the cluster that it created for this
The `submit_job_to_cluster.py` job deletes the cluster that it created for this
walkthrough.
If you created a bucket to use for this walkthrough,
Expand Down
3 changes: 2 additions & 1 deletion dataproc/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
google-api-python-client==1.7.8
grpcio>=1.2.0
google-auth==1.6.2
google-auth-httplib2==0.0.3
google-cloud==0.34.0
google-cloud-storage==1.13.2
google-cloud-dataproc==0.3.1
208 changes: 208 additions & 0 deletions dataproc/single_job_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""Sample Cloud Dataproc inline workflow to run a pyspark job on an ephermeral
cluster.
Example Usage to run the inline workflow on a managed cluster:
python single_job_workflow.py --project_id=$PROJECT --gcs_bucket=$BUCKET \
--cluster_name=$CLUSTER --zone=$ZONE
Example Usage to run the inline workflow on a global region managed cluster:
python submit_job_to_cluster.py --project_id=$PROJECT --gcs_bucket=$BUCKET \
--cluster_name=$CLUSTER --zone=$ZONE --global_region
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import os

from google.cloud import dataproc_v1
from google.cloud.dataproc_v1.gapic.transports import (
workflow_template_service_grpc_transport)
from google.cloud import storage

DEFAULT_FILENAME = "pyspark_sort.py"
waiting_callback = False


def get_pyspark_file(pyspark_file=None):
if pyspark_file:
f = open(pyspark_file, "rb")
return f, os.path.basename(pyspark_file)
else:
"""Gets the PySpark file from current directory."""
current_dir = os.path.dirname(os.path.abspath(__file__))
f = open(os.path.join(current_dir, DEFAULT_FILENAME), "rb")
return f, DEFAULT_FILENAME


def get_region_from_zone(zone):
try:
region_as_list = zone.split("-")[:-1]
return "-".join(region_as_list)
except (AttributeError, IndexError, ValueError):
raise ValueError("Invalid zone provided, please check your input.")


def upload_pyspark_file(project, bucket_name, filename, spark_file):
"""Uploads the PySpark file in this directory to the configured input
bucket."""
print("Uploading pyspark file to Cloud Storage.")
client = storage.Client(project=project)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(filename)
blob.upload_from_file(spark_file)


def run_workflow(dataproc, project, region, zone, bucket_name, filename,
cluster_name):

parent = "projects/{}/regions/{}".format(project, region)
zone_uri = ("https://www.googleapis.com/compute/v1/projects/{}/zones/{}"
.format(project, zone))

workflow_data = {
"placement": {
"managed_cluster": {
"cluster_name": cluster_name,
"config": {
"gce_cluster_config": {"zone_uri": zone_uri},
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-1",
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-1",
},
},
}
},
"jobs": [
{
"pyspark_job": {
"main_python_file_uri": "gs://{}/{}".format(
bucket_name, filename)
},
"step_id": "pyspark-job",
}
],
}

workflow = dataproc.instantiate_inline_workflow_template(parent,
workflow_data)

workflow.add_done_callback(callback)
global waiting_callback
waiting_callback = True


def callback(operation_future):
# Reset global when callback returns.
global waiting_callback
waiting_callback = False


def wait_for_workflow_end():
"""Wait for cluster creation."""
print("Waiting for workflow completion ...")
print("Workflow and job progress, and job driver output available from: "
"https://console.cloud.google.com/dataproc/workflows/")

while True:
if not waiting_callback:
print("Workflow completed.")
break


def main(
project_id,
zone,
cluster_name,
bucket_name,
pyspark_file=None,
create_new_cluster=True,
global_region=True,
):

# [START dataproc_get_workflow_template_client]
if global_region:
region = "global"
# Use the default gRPC global endpoints.
dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient()
else:
region = get_region_from_zone(zone)
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = (workflow_template_service_grpc_transport
.WorkflowTemplateServiceGrpcTransport(
address="{}-dataproc.googleapis.com:443"
.format(region)))
dataproc_workflow_client = dataproc_v1.WorkflowTemplateServiceClient(
client_transport
)
# [END dataproc_get_workflow_template_client]

try:
spark_file, spark_filename = get_pyspark_file(pyspark_file)
upload_pyspark_file(project_id, bucket_name, spark_filename,
spark_file)

run_workflow(
dataproc_workflow_client,
project_id,
region,
zone,
bucket_name,
spark_filename,
cluster_name
)
wait_for_workflow_end()

finally:
spark_file.close()


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=(argparse
.RawDescriptionHelpFormatter))
parser.add_argument(
"--project_id", help="Project ID you want to access.", required=True
)
parser.add_argument(
"--zone", help="Zone to create clusters in/connect to", required=True
)
parser.add_argument(
"--cluster_name", help="Name of the cluster to create/connect to",
required=True
)
parser.add_argument(
"--gcs_bucket", help="Bucket to upload Pyspark file to", required=True
)
parser.add_argument(
"--pyspark_file", help="Pyspark filename. Defaults to pyspark_sort.py"
)
parser.add_argument("--global_region",
action="store_true",
help="If cluster is in the global region")

args = parser.parse_args()
main(
args.project_id,
args.zone,
args.cluster_name,
args.gcs_bucket,
args.pyspark_file,
)
Loading

0 comments on commit 1bddafd

Please sign in to comment.