Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5980] Remove redundant combine tests #9286

Merged
merged 2 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 19 additions & 57 deletions .test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import CommonJobProperties as commonJobProperties
import CommonTestProperties
import Infrastructure as infra
import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import Infrastructure as infra

String jenkinsJobName = 'beam_LoadTests_Python_Combine_Flink_Batch'
String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
Expand All @@ -32,7 +32,7 @@ String pythonHarnessImageTag = "${dockerRegistryRoot}/python:${dockerTag}"
String flinkVersion = '1.7'
String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz'

def loadTestConfigurationsFiveWorkers = { datasetName -> [
def scenarios = { datasetName -> [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that datasetName is not used. Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is used to fill metrics_dataset according to triggering context (pr/not a pr).

[
title : 'Combine Python Load test: 2GB 10 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
Expand All @@ -54,51 +54,6 @@ def loadTestConfigurationsFiveWorkers = { datasetName -> [
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-2-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_2',
input_options : '\'{' +
'"num_records": 20000000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 kilobyte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-3-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_3',
input_options : '\'{' +
'"num_records": 2000,' +
'"key_size": 100000,' +
'"value_size": 90}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
]}

def loadTestConfigurationsSixteenWorkers = { datasetName -> [
[
title : 'Combine Python Load test: 2GB Fanout 4',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
Expand Down Expand Up @@ -142,36 +97,43 @@ def loadTestConfigurationsSixteenWorkers = { datasetName -> [
fanout : 8,
top_count : 20,
]
],
]
]}

def batchLoadTestJob = { scope, triggeringContext ->
scope.description('Runs Python Combine load tests on Flink runner in batch mode')
commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240)

def numberOfWorkers = 16
def scaledNumberOfWorkers = 5
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)

List<Map> testScenarios = scenarios(datasetName)

infra.prepareSDKHarness(scope, CommonTestProperties.SDK.PYTHON, dockerRegistryRoot, dockerTag)
infra.prepareFlinkJobServer(scope, flinkVersion, dockerRegistryRoot, dockerTag)
infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers)

def testConfigs = loadTestConfigurationsSixteenWorkers(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}
defineTestSteps(scope, testScenarios, [
'Combine Python Load test: 2GB Fanout 4',
'Combine Python Load test: 2GB Fanout 8'
])

def scaledNumberOfWorkers = 5
infra.scaleCluster(scope, jenkinsJobName, scaledNumberOfWorkers)

testConfigs = loadTestConfigurationsFiveWorkers(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}
defineTestSteps(scope, testScenarios, ['Combine Python Load test: 2GB 10 byte records'])

infra.teardownDataproc(scope, jenkinsJobName)
}

private List<Map> defineTestSteps(scope, List<Map> testScenarios, List<String> titles) {
return testScenarios
.findAll { it.title in titles }
.forEach {
loadTestsBuilder.loadTest(scope, it.title, it.runner, CommonTestProperties.SDK.PYTHON, it.jobProperties, it.itClass)
}
}

PhraseTriggeringPostCommitBuilder.postCommitJob(
'beam_LoadTests_Python_Combine_Flink_Batch',
'Run Load Tests Python Combine Flink Batch',
Expand Down
109 changes: 26 additions & 83 deletions .test-infra/jenkins/job_LoadTests_Combine_Java.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,101 +16,44 @@
* limitations under the License.
*/


import CommonJobProperties as commonJobProperties
import CommonTestProperties
import CronJobBuilder
import LoadTestsBuilder as loadTestsBuilder
import PhraseTriggeringPostCommitBuilder
import CronJobBuilder

def commonLoadTestConfig = { jobType, isStreaming, datasetName ->
[
[
title : 'Load test: 2GB of 10B records',
itClass : 'org.apache.beam.sdk.loadtests.CombineLoadTest',
runner : CommonTestProperties.Runner.DATAFLOW,
jobProperties: [
project : 'apache-beam-testing',
appName : "load_tests_Java_Dataflow_${jobType}_Combine_1",
tempLocation : 'gs://temp-storage-for-perf-tests/loadtests',
publishToBigQuery : true,
bigQueryDataset : datasetName,
bigQueryTable : "java_dataflow_${jobType}_Combine_1",
sourceOptions : """
[
[
title : 'Load test: 2GB of 10B records',
itClass : 'org.apache.beam.sdk.loadtests.CombineLoadTest',
runner : CommonTestProperties.Runner.DATAFLOW,
jobProperties: [
project : 'apache-beam-testing',
appName : "load_tests_Java_Dataflow_${jobType}_Combine_1",
tempLocation : 'gs://temp-storage-for-perf-tests/loadtests',
publishToBigQuery : true,
bigQueryDataset : datasetName,
bigQueryTable : "java_dataflow_${jobType}_Combine_1",
sourceOptions : """
{
"numRecords": 200000000,
"keySizeBytes": 1,
"valueSizeBytes": 9
}
""".trim().replaceAll("\\s", ""),
fanout : 1,
iterations : 1,
topCount : 20,
maxNumWorkers : 5,
numWorkers : 5,
autoscalingAlgorithm: "NONE",
perKeyCombiner : "TOP_LARGEST",
streaming : isStreaming
]
],
[
title : 'Load test: 2GB of 100B records',
itClass : 'org.apache.beam.sdk.loadtests.CombineLoadTest',
runner : CommonTestProperties.Runner.DATAFLOW,
jobProperties: [
project : 'apache-beam-testing',
appName : "load_tests_Java_Dataflow_${jobType}_Combine_2",
tempLocation : 'gs://temp-storage-for-perf-tests/loadtests',
publishToBigQuery : true,
bigQueryDataset : datasetName,
bigQueryTable : "java_dataflow_${jobType}_Combine_2",
sourceOptions : """
{
"numRecords": 20000000,
"keySizeBytes": 10,
"valueSizeBytes": 90
}
""".trim().replaceAll("\\s", ""),
fanout : 1,
iterations : 1,
topCount : 20,
maxNumWorkers : 5,
numWorkers : 5,
autoscalingAlgorithm: "NONE",
perKeyCombiner : "TOP_LARGEST",
streaming : isStreaming
]
],
[

title : 'Load test: 2GB of 100kB records',
itClass : 'org.apache.beam.sdk.loadtests.CombineLoadTest',
runner : CommonTestProperties.Runner.DATAFLOW,
jobProperties: [
project : 'apache-beam-testing',
appName : "load_tests_Java_Dataflow_${jobType}_Combine_3",
tempLocation : 'gs://temp-storage-for-perf-tests/loadtests',
publishToBigQuery : true,
bigQueryDataset : datasetName,
bigQueryTable : "java_dataflow_${jobType}_Combine_3",
sourceOptions : """
{
"numRecords": 2000,
"keySizeBytes": 100000,
"valueSizeBytes": 900000
}
""".trim().replaceAll("\\s", ""),
fanout : 1,
iterations : 1,
topCount : 20,
maxNumWorkers : 5,
numWorkers : 5,
autoscalingAlgorithm: "NONE",
perKeyCombiner : "TOP_LARGEST",
streaming : isStreaming
]

],
[
fanout : 1,
iterations : 1,
topCount : 20,
maxNumWorkers : 5,
numWorkers : 5,
autoscalingAlgorithm: "NONE",
perKeyCombiner : "TOP_LARGEST",
streaming : isStreaming
]
],
[
title : 'Load test: fanout 4 times with 2GB 10-byte records total',
itClass : 'org.apache.beam.sdk.loadtests.CombineLoadTest',
runner : CommonTestProperties.Runner.DATAFLOW,
Expand Down
44 changes: 0 additions & 44 deletions .test-infra/jenkins/job_LoadTests_Combine_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -45,50 +45,6 @@ def loadTestConfigurations = { datasetName -> [
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.DATAFLOW,
sdk : CommonTestProperties.SDK.PYTHON,
jobProperties: [
job_name : 'load-tests-python-dataflow-batch-combine-2-' + now,
project : 'apache-beam-testing',
temp_location : 'gs://temp-storage-for-perf-tests/smoketests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_combine_2',
input_options : '\'{' +
'"num_records": 20000000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
max_num_workers : 5,
num_workers : 5,
autoscaling_algorithm: "NONE",
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 kilobyte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.DATAFLOW,
sdk : CommonTestProperties.SDK.PYTHON,
jobProperties: [
job_name : 'load-tests-python-dataflow-batch-combine-3-' + now,
project : 'apache-beam-testing',
temp_location : 'gs://temp-storage-for-perf-tests/smoketests',
publish_to_big_query : true,
metrics_dataset : datasetName,
metrics_table : 'python_dataflow_batch_combine_3',
input_options : '\'{' +
'"num_records": 2000,' +
'"key_size": 100000,' +
'"value_size": 90}\'',
max_num_workers : 5,
num_workers : 5,
autoscaling_algorithm: "NONE",
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB Fanout 4',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
Expand Down