From ce8be07adf0ea8191db04ef77d0355dfe520a271 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Gajowy?= Date: Wed, 7 Aug 2019 14:09:54 +0200 Subject: [PATCH] [BEAM-5980] Remove redundant Combine Performance tests for Python SDK running on Flink It turned out we tested nothing with the three tests. They mapped records of different sizes to Long which basically made the size irrelevant for the rest of the pipeline. --- .../job_LoadTests_Combine_Flink_Python.groovy | 81 +------------------ 1 file changed, 4 insertions(+), 77 deletions(-) diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy index a99381a20e415..cb4c94a93a4fc 100644 --- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy @@ -32,73 +32,7 @@ String pythonHarnessImageTag = "${dockerRegistryRoot}/python:${dockerTag}" String flinkVersion = '1.7' String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz' -def loadTestConfigurationsFiveWorkers = { datasetName -> [ - [ - title : 'Combine Python Load test: 2GB 10 byte records', - itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', - runner : CommonTestProperties.Runner.PORTABLE, - jobProperties: [ - job_name : 'load-tests-python-flink-batch-combine-1-' + now, - project : 'apache-beam-testing', - publish_to_big_query: true, - metrics_dataset : datasetName, - metrics_table : 'python_flink_batch_combine_1', - input_options : '\'{' + - '"num_records": 200000000,' + - '"key_size": 1,' + - '"value_size": 9}\'', - parallelism : 5, - job_endpoint : 'localhost:8099', - environment_config : pythonHarnessImageTag, - environment_type : 'DOCKER', - top_count : 20, - ] - ], - [ - title : 'Combine Python Load test: 2GB 100 byte records', - itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', - runner : CommonTestProperties.Runner.PORTABLE, - jobProperties: [ - job_name : 'load-tests-python-flink-batch-combine-2-' + now, - project : 'apache-beam-testing', - publish_to_big_query: true, - metrics_dataset : datasetName, - metrics_table : 'python_flink_batch_combine_2', - input_options : '\'{' + - '"num_records": 20000000,' + - '"key_size": 10,' + - '"value_size": 90}\'', - parallelism : 5, - job_endpoint : 'localhost:8099', - environment_config : pythonHarnessImageTag, - environment_type : 'DOCKER', - top_count : 20, - ] - ], - [ - title : 'Combine Python Load test: 2GB 100 kilobyte records', - itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', - runner : CommonTestProperties.Runner.PORTABLE, - jobProperties: [ - job_name : 'load-tests-python-flink-batch-combine-3-' + now, - project : 'apache-beam-testing', - publish_to_big_query: true, - metrics_dataset : datasetName, - metrics_table : 'python_flink_batch_combine_3', - input_options : '\'{' + - '"num_records": 2000,' + - '"key_size": 100000,' + - '"value_size": 90}\'', - parallelism : 5, - job_endpoint : 'localhost:8099', - environment_config : pythonHarnessImageTag, - environment_type : 'DOCKER', - top_count : 20, - ] - ], -]} - -def loadTestConfigurationsSixteenWorkers = { datasetName -> [ +def loadTestConfigurations = { datasetName -> [ [ title : 'Combine Python Load test: 2GB Fanout 4', itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', @@ -149,22 +83,15 @@ def batchLoadTestJob = { scope, triggeringContext -> scope.description('Runs Python Combine load tests on Flink runner in batch mode') commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240) - def numberOfWorkers = 16 - def scaledNumberOfWorkers = 5 def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext) infra.prepareSDKHarness(scope, CommonTestProperties.SDK.PYTHON, dockerRegistryRoot, dockerTag) infra.prepareFlinkJobServer(scope, flinkVersion, dockerRegistryRoot, dockerTag) - infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers) - def testConfigs = loadTestConfigurationsSixteenWorkers(datasetName) - for (config in testConfigs) { - loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass) - } - - infra.scaleCluster(scope, jenkinsJobName, scaledNumberOfWorkers) + def numberOfWorkers = 16 + infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers) - testConfigs = loadTestConfigurationsFiveWorkers(datasetName) + def testConfigs = loadTestConfigurations(datasetName) for (config in testConfigs) { loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass) }