Skip to content

Commit

Permalink
Merge pull request #7778 from charlesccychen/python-version-tag
Browse files Browse the repository at this point in the history
Label Dataflow jobs with SDK name including Python major / minor version
  • Loading branch information
aaltay authored Feb 8, 2019
2 parents 622c9c8 + 302e7f7 commit af8c400
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def __init__(self, packages, options, environment_version, pipeline_url):
self.proto.userAgent.additionalProperties.extend([
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='name',
value=to_json_value(shared_names.BEAM_SDK_NAME)),
value=to_json_value(self._get_python_sdk_name())),
dataflow.Environment.UserAgentValue.AdditionalProperty(
key='version', value=to_json_value(beam_version.__version__))])
# Version information.
Expand Down Expand Up @@ -280,6 +280,10 @@ def __init__(self, packages, options, environment_version, pipeline_url):
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='display_data', value=to_json_value(items)))

def _get_python_sdk_name(self):
python_version = '%d.%d' % (sys.version_info[0], sys.version_info[1])
return 'Apache Beam Python %s SDK' % python_version


class Job(object):
"""Wrapper for a dataflow Job protobuf."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,19 @@ def test_experiment_use_multiple_sdk_containers(self):
self.assertNotIn(
"use_multiple_sdk_containers", environment.proto.experiments)

@mock.patch('apache_beam.runners.dataflow.internal.apiclient.sys')
def test_get_python_sdk_name(self, mock_sys):
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
'--temp_location', 'gs://test-location/temp',
'--experiments', 'beam_fn_api',
'--experiments', 'use_multiple_sdk_containers'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
mock_sys.version_info = [22, 333]
self.assertEqual('Apache Beam Python 22.333 SDK',
environment._get_python_sdk_name())


if __name__ == '__main__':
unittest.main()

0 comments on commit af8c400

Please sign in to comment.