Skip to content

Commit

Permalink
[BEAM-5980] Refactor: keep all Flink Python Combine load test scenari…
Browse files Browse the repository at this point in the history
…os in one place
  • Loading branch information
lgajowy committed Aug 19, 2019
1 parent ff0f308 commit d44db9a
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions .test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import CommonJobProperties as commonJobProperties
import CommonTestProperties
import Infrastructure as infra
import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import Infrastructure as infra

String jenkinsJobName = 'beam_LoadTests_Python_Combine_Flink_Batch'
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
Expand All @@ -32,7 +32,7 @@ String pythonHarnessImageTag = "${dockerRegistryRoot}/python:${dockerTag}"
String flinkVersion = '1.7'
String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz'

def loadTestConfigurationsFiveWorkers = { datasetName -> [
def scenarios = { datasetName -> [
[
title : 'Combine Python Load test: 2GB 10 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
Expand Down Expand Up @@ -96,9 +96,6 @@ def loadTestConfigurationsFiveWorkers = { datasetName -> [
top_count : 20,
]
],
]}

def loadTestConfigurationsSixteenWorkers = { datasetName -> [
[
title : 'Combine Python Load test: 2GB Fanout 4',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
Expand Down Expand Up @@ -142,36 +139,47 @@ def loadTestConfigurationsSixteenWorkers = { datasetName -> [
fanout : 8,
top_count : 20,
]
],
]
]}

def batchLoadTestJob = { scope, triggeringContext ->
scope.description('Runs Python Combine load tests on Flink runner in batch mode')
commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240)

def numberOfWorkers = 16
def scaledNumberOfWorkers = 5
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)

List<Map> testScenarios = scenarios(datasetName)

infra.prepareSDKHarness(scope, CommonTestProperties.SDK.PYTHON, dockerRegistryRoot, dockerTag)
infra.prepareFlinkJobServer(scope, flinkVersion, dockerRegistryRoot, dockerTag)
infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers)

def testConfigs = loadTestConfigurationsSixteenWorkers(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}
defineTestSteps(scope, testScenarios, [
'Combine Python Load test: 2GB Fanout 4',
'Combine Python Load test: 2GB Fanout 8'
])

def scaledNumberOfWorkers = 5
infra.scaleCluster(scope, jenkinsJobName, scaledNumberOfWorkers)

testConfigs = loadTestConfigurationsFiveWorkers(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}
defineTestSteps(scope, testScenarios, [
'Combine Python Load test: 2GB 10 byte records',
'Combine Python Load test: 2GB 100 byte records',
'Combine Python Load test: 2GB 100 kilobyte records'
])

infra.teardownDataproc(scope, jenkinsJobName)
}

private List<Map> defineTestSteps(scope, List<Map> testScenarios, List<String> titles) {
return testScenarios
.findAll { it.title in titles }
.forEach {
loadTestsBuilder.loadTest(scope, it.title, it.runner, CommonTestProperties.SDK.PYTHON, it.jobProperties, it.itClass)
}
}

PhraseTriggeringPostCommitBuilder.postCommitJob(
'beam_LoadTests_Python_Combine_Flink_Batch',
'Run Load Tests Python Combine Flink Batch',
Expand Down

0 comments on commit d44db9a

Please sign in to comment.