diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy index a99381a20e415..5c8c5afb4f86d 100644 --- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy @@ -18,9 +18,9 @@ import CommonJobProperties as commonJobProperties import CommonTestProperties +import Infrastructure as infra import LoadTestsBuilder as loadTestsBuilder import PhraseTriggeringPostCommitBuilder -import Infrastructure as infra String jenkinsJobName = 'beam_LoadTests_Python_Combine_Flink_Batch' String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC')) @@ -32,7 +32,7 @@ String pythonHarnessImageTag = "${dockerRegistryRoot}/python:${dockerTag}" String flinkVersion = '1.7' String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz' -def loadTestConfigurationsFiveWorkers = { datasetName -> [ +def scenarios = { datasetName -> [ [ title : 'Combine Python Load test: 2GB 10 byte records', itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', @@ -96,9 +96,6 @@ def loadTestConfigurationsFiveWorkers = { datasetName -> [ top_count : 20, ] ], -]} - -def loadTestConfigurationsSixteenWorkers = { datasetName -> [ [ title : 'Combine Python Load test: 2GB Fanout 4', itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally', @@ -142,7 +139,7 @@ def loadTestConfigurationsSixteenWorkers = { datasetName -> [ fanout : 8, top_count : 20, ] - ], + ] ]} def batchLoadTestJob = { scope, triggeringContext -> @@ -150,28 +147,39 @@ def batchLoadTestJob = { scope, triggeringContext -> commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240) def numberOfWorkers = 16 - def scaledNumberOfWorkers = 5 def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext) + List testScenarios = scenarios(datasetName) + infra.prepareSDKHarness(scope, CommonTestProperties.SDK.PYTHON, dockerRegistryRoot, dockerTag) infra.prepareFlinkJobServer(scope, flinkVersion, dockerRegistryRoot, dockerTag) infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers) - def testConfigs = loadTestConfigurationsSixteenWorkers(datasetName) - for (config in testConfigs) { - loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass) - } + defineTestSteps(scope, testScenarios, [ + 'Combine Python Load test: 2GB Fanout 4', + 'Combine Python Load test: 2GB Fanout 8' + ]) + def scaledNumberOfWorkers = 5 infra.scaleCluster(scope, jenkinsJobName, scaledNumberOfWorkers) - testConfigs = loadTestConfigurationsFiveWorkers(datasetName) - for (config in testConfigs) { - loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass) - } + defineTestSteps(scope, testScenarios, [ + 'Combine Python Load test: 2GB 10 byte records', + 'Combine Python Load test: 2GB 100 byte records', + 'Combine Python Load test: 2GB 100 kilobyte records' + ]) infra.teardownDataproc(scope, jenkinsJobName) } +private List defineTestSteps(scope, List testScenarios, List titles) { + return testScenarios + .findAll { it.title in titles } + .forEach { + loadTestsBuilder.loadTest(scope, it.title, it.runner, CommonTestProperties.SDK.PYTHON, it.jobProperties, it.itClass) + } +} + PhraseTriggeringPostCommitBuilder.postCommitJob( 'beam_LoadTests_Python_Combine_Flink_Batch', 'Run Load Tests Python Combine Flink Batch',