Skip to content

Commit

Permalink
dataflow: wait for jobs to be cancellable in tests for run_template. (#…
Browse files Browse the repository at this point in the history
…2829)

Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>
  • Loading branch information
davidcavazos and leahecole committed Feb 12, 2020
1 parent 47af18d commit 5ca361b
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions dataflow/run_template/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import pytest
import time
import uuid

from datetime import datetime
from googleapiclient.discovery import build
Expand All @@ -41,17 +42,22 @@ def app():
return flask.Flask(__name__)


def unique_job_name(label):
return datetime.now().strftime('{}-%Y%m%d-%H%M%S-{}'.format(
label, uuid.uuid4().hex))


def test_run_template_python_empty_args(app):
project = PROJECT
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
job = unique_job_name('test_run_template_empty')
template = 'gs://dataflow-templates/latest/Word_Count'
with pytest.raises(HttpError):
main.run(project, job, template)


def test_run_template_python(app):
project = PROJECT
job = datetime.now().strftime('test_run_template_python-%Y%m%d-%H%M%S')
job = unique_job_name('test_run_template_python')
template = 'gs://dataflow-templates/latest/Word_Count'
parameters = {
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
Expand All @@ -70,7 +76,7 @@ def test_run_template_http_empty_args(app):
def test_run_template_http_url(app):
args = {
'project': PROJECT,
'job': datetime.now().strftime('test_run_template_url-%Y%m%d-%H%M%S'),
'job': unique_job_name('test_run_template_url'),
'template': 'gs://dataflow-templates/latest/Word_Count',
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
Expand All @@ -84,7 +90,7 @@ def test_run_template_http_url(app):
def test_run_template_http_data(app):
args = {
'project': PROJECT,
'job': datetime.now().strftime('test_run_template_data-%Y%m%d-%H%M%S'),
'job': unique_job_name('test_run_template_data'),
'template': 'gs://dataflow-templates/latest/Word_Count',
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
Expand All @@ -98,7 +104,7 @@ def test_run_template_http_data(app):
def test_run_template_http_json(app):
args = {
'project': PROJECT,
'job': datetime.now().strftime('test_run_template_json-%Y%m%d-%H%M%S'),
'job': unique_job_name('test_run_template_json'),
'template': 'gs://dataflow-templates/latest/Word_Count',
'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt',
'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET),
Expand All @@ -110,9 +116,17 @@ def test_run_template_http_json(app):


def dataflow_jobs_cancel(job_id):
# Wait time until a job can be cancelled, as a best effort.
# If it fails to be cancelled, the job will run for ~8 minutes.
time.sleep(5) # seconds
# Wait time until the job can be cancelled.
state = None
while state != 'JOB_STATE_RUNNING':
job = dataflow.projects().jobs().get(
projectId=PROJECT,
jobId=job_id
).execute()
state = job['currentState']
time.sleep(1)

# Cancel the Dataflow job.
request = dataflow.projects().jobs().update(
projectId=PROJECT,
jobId=job_id,
Expand Down

0 comments on commit 5ca361b

Please sign in to comment.