Skip to content

Commit

Permalink
[7.x] [ML][Data Frame] Adding bwc tests for pivot transform (#43506) (#…
Browse files Browse the repository at this point in the history
…43929) (#43947)

* [ML][Data Frame] Adding bwc tests for pivot transform (#43506)

* [ML][Data Frame] Adding bwc tests for pivot transform

* adding continuous transforms

* adding continuous dataframes to bwc

* adding continuous data frame tests

* Adding rolling upgrade tests for continuous df

* Fixing test

* Adjusting indices used in BWC, and handling NPE for seq_no_stats

* updating and muting specific bwc test

* Adjusting bwc tests for backport
  • Loading branch information
benwtrent committed Jul 3, 2019
1 parent f8fd432 commit c8b0424
Show file tree
Hide file tree
Showing 6 changed files with 523 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
Expand Down Expand Up @@ -222,6 +224,16 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
for (ShardStats shard : shards) {
String indexName = shard.getShardRouting().getIndexName();
if (userIndices.contains(indexName)) {
SeqNoStats seqNoStats = shard.getSeqNoStats();
// SeqNoStats could be `null`. This indicates that an `AlreadyClosed` exception was thrown somewhere down the stack
// Indicates that the index COULD be closed, or at least that the shard is not fully recovered yet.
if (seqNoStats == null) {
logger.warn("failure gathering checkpoint information for index [{}] as seq_no_stats were null. Shard Stats [{}]",
indexName,
Strings.toString(shard));
throw new CheckpointException(
"Unable to gather checkpoint information for index [" + indexName + "]. seq_no_stats are missing.");
}
if (checkpointsByIndex.containsKey(indexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
Expand Down
42 changes: 35 additions & 7 deletions x-pack/qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ for (Version version : bwcVersions.wireCompatible) {
oldClusterTestRunner.configure {
systemProperty 'tests.rest.suite', 'old_cluster'
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
// Dataframe transforms were not added until 7.2.0
if (version.before('7.2.0')) {
systemProperty 'tests.rest.blacklist', [
'old_cluster/80_data_frame_jobs_crud/Test put batch data frame transforms on old cluster'
].join(',')
}
}

Closure configureUpgradeCluster = {String name, Task lastRunner, int stopNode, Closure getOtherUnicastHostAddresses ->
Expand Down Expand Up @@ -227,12 +233,18 @@ for (Version version : bwcVersions.wireCompatible) {
systemProperty 'tests.first_round', 'true'
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
// We only need to run these tests once so we may as well do it when we're two thirds upgraded
systemProperty 'tests.rest.blacklist', [
'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster',
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster'
].join(',')
def toBlackList = [
'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster',
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster',
'mixed_cluster/80_data_frame_jobs_crud/Test put batch data frame transforms on mixed cluster'
]
// Dataframe transforms were not added until 7.2.0
if (version.before('7.2.0')) {
toBlackList << 'mixed_cluster/80_data_frame_jobs_crud/Test GET, start, and stop old cluster batch transforms'
}
systemProperty 'tests.rest.blacklist', toBlackList.join(',')
finalizedBy "${baseName}#oldClusterTestCluster#node1.stop"
}

Expand All @@ -248,6 +260,14 @@ for (Version version : bwcVersions.wireCompatible) {
systemProperty 'tests.first_round', 'false'
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
finalizedBy "${baseName}#oldClusterTestCluster#node2.stop"
// Dataframe transforms were not added until 7.2.0
if (version.before('7.2.0')) {
systemProperty 'tests.rest.blacklist', [
'mixed_cluster/80_data_frame_jobs_crud/Test put batch data frame transforms on mixed cluster',
'mixed_cluster/80_data_frame_jobs_crud/Test GET, start, and stop old cluster batch transforms'

].join(',')
}
}

Task upgradedClusterTest = tasks.create(name: "${baseName}#upgradedClusterTest", type: RestIntegTestTask)
Expand All @@ -272,12 +292,20 @@ for (Version version : bwcVersions.wireCompatible) {
// otherwise we could check the index created version
String versionStr = project.extensions.findByName("${baseName}#oldClusterTestCluster").properties.get('bwcVersion')
String[] versionParts = versionStr.split('\\.')
def toBlackList = []
if (versionParts[0].equals("5")) {
Integer minor = Integer.parseInt(versionParts[1])
if (minor >= 2) {
systemProperty 'tests.rest.blacklist', '/20_security/Verify default password migration results in upgraded cluster'
toBlackList << '/20_security/Verify default password migration results in upgraded cluster'
}
}
// Dataframe transforms were not added until 7.2.0
if (version.before('7.2.0')) {
toBlackList << 'upgraded_cluster/80_data_frame_jobs_crud/Get start, stop, and delete old and mixed cluster batch data frame transforms'
}
if (!toBlackList.empty) {
systemProperty 'tests.rest.blacklist', toBlackList.join(',')
}
}

Task versionBwcTest = tasks.create(name: "${baseName}#bwcTest") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
---
"Test put batch data frame transforms on mixed cluster":
- do:
cluster.health:
index: "dataframe-transform-airline-data"
wait_for_status: green
timeout: 70s

- do:
data_frame.put_data_frame_transform:
transform_id: "mixed-simple-transform"
body: >
{
"source": { "index": "dataframe-transform-airline-data" },
"dest": { "index": "mixed-simple-transform-idx" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- match: { acknowledged: true }

- do:
data_frame.start_data_frame_transform:
transform_id: "mixed-simple-transform"
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "mixed-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-simple-transform" }
- match: { transforms.0.state.task_state: "/started|stopped/" }

- do:
data_frame.stop_data_frame_transform:
transform_id: "mixed-simple-transform"
wait_for_completion: true
- match: { acknowledged: true }

- do:
data_frame.get_data_frame_transform_stats:
transform_id: "mixed-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-simple-transform" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }

- do:
data_frame.put_data_frame_transform:
transform_id: "mixed-complex-transform"
body: >
{
"source": {
"index": "dataframe-transform-airline-data",
"query": {
"bool": {
"filter": {"term": {"airline": "ElasticAir"}}
}
}
},
"dest": {
"index": "mixed-complex-transform-idx"
},
"pivot": {
"group_by": {
"airline": {"terms": {"field": "airline"}},
"day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}},
"every_50": {"histogram": {"field": "responsetime", "interval": 50}}
},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- match: { acknowledged: true }

- do:
data_frame.get_data_frame_transform:
transform_id: "mixed-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-complex-transform" }

- do:
data_frame.start_data_frame_transform:
transform_id: "mixed-complex-transform"
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "mixed-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-complex-transform" }
- match: { transforms.0.state.task_state: "/started|stopped/" }

- do:
data_frame.stop_data_frame_transform:
transform_id: "mixed-complex-transform"
wait_for_completion: true
- match: { acknowledged: true }

- do:
data_frame.get_data_frame_transform_stats:
transform_id: "mixed-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "mixed-complex-transform" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }

---
"Test GET, start, and stop old cluster batch transforms":
- do:
cluster.health:
index: "dataframe-transform-airline-data"
wait_for_status: green
timeout: 70s

- do:
data_frame.get_data_frame_transform:
transform_id: "old-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-simple-transform" }
- match: { transforms.0.source.index.0: "dataframe-transform-airline-data" }
- match: { transforms.0.dest.index: "old-simple-transform-idx" }
- match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
- match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }

- do:
data_frame.start_data_frame_transform:
transform_id: "old-simple-transform"
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "old-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-simple-transform" }
- match: { transforms.0.state.task_state: "/started|stopped/" }

- do:
data_frame.stop_data_frame_transform:
transform_id: "old-simple-transform"
wait_for_completion: true
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "old-simple-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-simple-transform" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }

- do:
data_frame.get_data_frame_transform:
transform_id: "old-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-complex-transform" }
- match: { transforms.0.source.index.0: "dataframe-transform-airline-data" }
- match: { transforms.0.dest.index: "old-complex-transform-idx" }
- match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
- match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" }
- match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" }
- match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }

- do:
data_frame.start_data_frame_transform:
transform_id: "old-complex-transform"
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "old-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-complex-transform" }
- match: { transforms.0.state.task_state: "/started|stopped/" }

- do:
data_frame.stop_data_frame_transform:
transform_id: "old-complex-transform"
wait_for_completion: true
- match: { acknowledged: true }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "old-complex-transform"
- match: { count: 1 }
- match: { transforms.0.id: "old-complex-transform" }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
Loading

0 comments on commit c8b0424

Please sign in to comment.