diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy index a99381a20e415..cb4c94a93a4fc 100644 --- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy @@ -32,73 +32,7 @@ String pythonHarnessImageTag = "${dockerRegistryRoot}/python:${dockerTag}" String flinkVersion = '1.7' String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz' -def loadTestConfigurationsFiveWorkers = { datasetName -> [ - [ - title : 'Combine Python Load test: 2GB 10 byte records', - itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', - runner : CommonTestProperties.Runner.PORTABLE, - jobProperties: [ - job_name : 'load-tests-python-flink-batch-combine-1-' + now, - project : 'apache-beam-testing', - publish_to_big_query: true, - metrics_dataset : datasetName, - metrics_table : 'python_flink_batch_combine_1', - input_options : '\'{' + - '"num_records": 200000000,' + - '"key_size": 1,' + - '"value_size": 9}\'', - parallelism : 5, - job_endpoint : 'localhost:8099', - environment_config : pythonHarnessImageTag, - environment_type : 'DOCKER', - top_count : 20, - ] - ], - [ - title : 'Combine Python Load test: 2GB 100 byte records', - itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', - runner : CommonTestProperties.Runner.PORTABLE, - jobProperties: [ - job_name : 'load-tests-python-flink-batch-combine-2-' + now, - project : 'apache-beam-testing', - publish_to_big_query: true, - metrics_dataset : datasetName, - metrics_table : 'python_flink_batch_combine_2', - input_options : '\'{' + - '"num_records": 20000000,' + - '"key_size": 10,' + - '"value_size": 90}\'', - parallelism : 5, - job_endpoint : 'localhost:8099', - environment_config : pythonHarnessImageTag, - environment_type : 'DOCKER', - top_count : 20, - ] - ], - [ - title : 'Combine Python Load test: 2GB 100 kilobyte records', - itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', - runner : CommonTestProperties.Runner.PORTABLE, - jobProperties: [ - job_name : 'load-tests-python-flink-batch-combine-3-' + now, - project : 'apache-beam-testing', - publish_to_big_query: true, - metrics_dataset : datasetName, - metrics_table : 'python_flink_batch_combine_3', - input_options : '\'{' + - '"num_records": 2000,' + - '"key_size": 100000,' + - '"value_size": 90}\'', - parallelism : 5, - job_endpoint : 'localhost:8099', - environment_config : pythonHarnessImageTag, - environment_type : 'DOCKER', - top_count : 20, - ] - ], -]} - -def loadTestConfigurationsSixteenWorkers = { datasetName -> [ +def loadTestConfigurations = { datasetName -> [ [ title : 'Combine Python Load test: 2GB Fanout 4', itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', @@ -149,22 +83,15 @@ def batchLoadTestJob = { scope, triggeringContext -> scope.description('Runs Python Combine load tests on Flink runner in batch mode') commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240) - def numberOfWorkers = 16 - def scaledNumberOfWorkers = 5 def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext) infra.prepareSDKHarness(scope, CommonTestProperties.SDK.PYTHON, dockerRegistryRoot, dockerTag) infra.prepareFlinkJobServer(scope, flinkVersion, dockerRegistryRoot, dockerTag) - infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers) - def testConfigs = loadTestConfigurationsSixteenWorkers(datasetName) - for (config in testConfigs) { - loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass) - } - - infra.scaleCluster(scope, jenkinsJobName, scaledNumberOfWorkers) + def numberOfWorkers = 16 + infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers) - testConfigs = loadTestConfigurationsFiveWorkers(datasetName) + def testConfigs = loadTestConfigurations(datasetName) for (config in testConfigs) { loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass) }