Skip to content

Commit

Permalink
[EWT-361] Fix broken regex pattern for extracting dataflow job id (tw…
Browse files Browse the repository at this point in the history
…itter-forks#51)

Update the dataflow URL regex as per AIRFLOW-9323

Co-authored-by: Rajat Srivastava <rajats@twitter.com>
  • Loading branch information
rajatsri28 and Rajat Srivastava authored Jun 18, 2020
1 parent 7b52a71 commit b210dbf
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions airflow/contrib/hooks/gcp_dataflow_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
DEFAULT_DATAFLOW_LOCATION = 'us-central1'


JOB_ID_PATTERN = re.compile(
r'Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]'
)


class _DataflowJob(LoggingMixin):
def __init__(self, dataflow, project_number, name, location, poll_sleep=10,
job_id=None, num_retries=None):
Expand Down Expand Up @@ -128,24 +133,25 @@ def __init__(self, cmd):

def _line(self, fd):
if fd == self._proc.stderr.fileno():
line = b''.join(self._proc.stderr.readlines())
line = self._proc.stderr.readline().decode()
if line:
self.log.warning(line[:-1])
self.log.warning(line.rstrip("\n"))
return line
if fd == self._proc.stdout.fileno():
line = b''.join(self._proc.stdout.readlines())
line = self._proc.stdout.readline().decode()
if line:
self.log.info(line[:-1])
self.log.info(line.rstrip("\n"))
return line

raise Exception("No data in stderr or in stdout.")

@staticmethod
def _extract_job(line):
# Job id info: https://goo.gl/SE29y9.
job_id_pattern = re.compile(
br'.*console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
matched_job = job_id_pattern.search(line or '')
# [EWT-361] : Fixes out of date regex to extract job id
matched_job = JOB_ID_PATTERN.search(line or '')
if matched_job:
return matched_job.group(1).decode()
return matched_job.group('job_id_java') or matched_job.group('job_id_python')

def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
Expand Down

0 comments on commit b210dbf

Please sign in to comment.