diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java index 282a3f9a04484..00d03066b6fff 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java @@ -50,7 +50,7 @@ public DestConfig(String index, String pipeline) { public DestConfig(final StreamInput in) throws IOException { index = in.readString(); - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_7_3_0)) { pipeline = in.readOptionalString(); } else { pipeline = null; @@ -72,7 +72,7 @@ public boolean isValid() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(index); - if (out.getVersion().onOrAfter(Version.CURRENT)) { + if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeOptionalString(pipeline); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java index fad9836b760d8..d028d3248c8f6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java @@ -15,6 +15,8 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats; @@ -222,6 +224,16 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(indexName); diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index 6fb89ab47a658..46e9bc10b21ed 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -240,7 +240,9 @@ for (Version version : bwcVersions.wireCompatible) { 'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data', 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed in mixed cluster', 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster', - 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster' + 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster', + 'mixed_cluster/80_data_frame_jobs_crud/Test put batch data frame transforms on mixed cluster', + 'mixed_cluster/80_data_frame_jobs_crud/Test put continuous data frame transform on mixed cluster' ].join(',') finalizedBy "${baseName}#oldClusterTestCluster#node1.stop" } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java new file mode 100644 index 0000000000000..1aee9130ddc50 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -0,0 +1,279 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.client.dataframe.transforms.DestConfig; +import org.elasticsearch.client.dataframe.transforms.SourceConfig; +import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig; +import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig; +import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig; +import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.xpack.test.rest.XPackRestTestConstants; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; + +@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662") +public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { + + private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version")); + private static final String DATAFRAME_ENDPOINT = "/_data_frame/transforms/"; + private static final String CONTINUOUS_DATA_FRAME_ID = "continuous-data-frame-upgrade-job"; + private static final String CONTINUOUS_DATA_FRAME_SOURCE = "data-frame-upgrade-continuous-source"; + private static final List ENTITIES = Stream.iterate(1, n -> n + 1) + .limit(5) + .map(v -> "user_" + v) + .collect(Collectors.toList()); + private static final List BUCKETS = Stream.iterate(1, n -> n + 1) + .limit(5) + .map(TimeValue::timeValueMinutes) + .collect(Collectors.toList()); + + @Override + protected Collection templatesToWaitFor() { + return Stream.concat(XPackRestTestConstants.DATA_FRAME_TEMPLATES.stream(), + super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + } + + protected static void waitForPendingDataFrameTasks() throws Exception { + waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("data_frame/transforms") == false); + } + + /** + * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results + * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade + */ + public void testDataFramesRollingUpgrade() throws Exception { + assumeTrue("Continuous data frames not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0)); + Request waitForYellow = new Request("GET", "/_cluster/health"); + waitForYellow.addParameter("wait_for_nodes", "3"); + waitForYellow.addParameter("wait_for_status", "yellow"); + switch (CLUSTER_TYPE) { + case OLD: + createAndStartContinuousDataFrame(); + break; + case MIXED: + client().performRequest(waitForYellow); + long lastCheckpoint = 1; + if (Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) { + lastCheckpoint = 2; + } + verifyContinuousDataFrameHandlesData(lastCheckpoint); + break; + case UPGRADED: + client().performRequest(waitForYellow); + verifyContinuousDataFrameHandlesData(3); + cleanUpTransforms(); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void cleanUpTransforms() throws Exception { + stopTransform(CONTINUOUS_DATA_FRAME_ID); + deleteTransform(CONTINUOUS_DATA_FRAME_ID); + waitForPendingDataFrameTasks(); + } + + private void createAndStartContinuousDataFrame() throws Exception { + createIndex(CONTINUOUS_DATA_FRAME_SOURCE); + long totalDocsWritten = 0; + for (TimeValue bucket : BUCKETS) { + int docs = randomIntBetween(1, 25); + putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, bucket, ENTITIES); + totalDocsWritten += docs * ENTITIES.size(); + } + + DataFrameTransformConfig config = DataFrameTransformConfig.builder() + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(30))) + .setPivotConfig(PivotConfig.builder() + .setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars"))) + .setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build()) + .build()) + .setDest(DestConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_ID + "_idx").build()) + .setSource(SourceConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_SOURCE).build()) + .setId(CONTINUOUS_DATA_FRAME_ID) + .build(); + putTransform(CONTINUOUS_DATA_FRAME_ID, config); + + startTransform(CONTINUOUS_DATA_FRAME_ID); + waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, 0L); + + DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + + assertThat(stateAndStats.getTransformStats().getOutputDocuments(), equalTo((long)ENTITIES.size())); + assertThat(stateAndStats.getTransformStats().getNumDocuments(), equalTo(totalDocsWritten)); + assertThat(stateAndStats.getTransformState().getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); + } + + private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception { + + // A continuous data frame should automatically become started when it gets assigned to a node + // if it was assigned to the node that was removed from the cluster + assertBusy(() -> { + DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + assertThat(stateAndStats.getTransformState().getTaskState(), equalTo(DataFrameTransformTaskState.STARTED)); + }, + 120, + TimeUnit.SECONDS); + + DataFrameTransformStateAndStats previousStateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + + // Add a new user and write data to it + // This is so we can have more reliable data counts, as writing to existing entities requires + // rescanning the past data + List entities = new ArrayList<>(1); + entities.add("user_" + ENTITIES.size() + expectedLastCheckpoint); + int docs = 5; + // Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin + // wait later. + putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities); + + waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint); + + assertBusy(() -> assertThat( + getTransformStats(CONTINUOUS_DATA_FRAME_ID).getTransformStats().getNumDocuments(), + greaterThanOrEqualTo(docs + previousStateAndStats.getTransformStats().getNumDocuments())), + 120, + TimeUnit.SECONDS); + DataFrameTransformStateAndStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID); + + assertThat(stateAndStats.getTransformState().getTaskState(), + equalTo(DataFrameTransformTaskState.STARTED)); + assertThat(stateAndStats.getTransformStats().getOutputDocuments(), + greaterThan(previousStateAndStats.getTransformStats().getOutputDocuments())); + assertThat(stateAndStats.getTransformStats().getNumDocuments(), + greaterThanOrEqualTo(docs + previousStateAndStats.getTransformStats().getNumDocuments())); + } + + private void putTransform(String id, DataFrameTransformConfig config) throws IOException { + final Request createDataframeTransformRequest = new Request("PUT", DATAFRAME_ENDPOINT + id); + createDataframeTransformRequest.setJsonEntity(Strings.toString(config)); + Response response = client().performRequest(createDataframeTransformRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private void deleteTransform(String id) throws IOException { + Response response = client().performRequest(new Request("DELETE", DATAFRAME_ENDPOINT + id)); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private void startTransform(String id) throws IOException { + final Request startDataframeTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + id + "/_start"); + Response response = client().performRequest(startDataframeTransformRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private void stopTransform(String id) throws IOException { + final Request stopDataframeTransformRequest = new Request("POST", + DATAFRAME_ENDPOINT + id + "/_stop?wait_for_completion=true"); + Response response = client().performRequest(stopDataframeTransformRequest); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private DataFrameTransformStateAndStats getTransformStats(String id) throws IOException { + final Request getStats = new Request("GET", DATAFRAME_ENDPOINT + id + "/_stats"); + Response response = client().performRequest(getStats); + assertEquals(200, response.getStatusLine().getStatusCode()); + XContentType xContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue()); + try (XContentParser parser = xContentType.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.getEntity().getContent())) { + GetDataFrameTransformStatsResponse resp = GetDataFrameTransformStatsResponse.fromXContent(parser); + assertThat(resp.getTransformsStateAndStats(), hasSize(1)); + return resp.getTransformsStateAndStats().get(0); + } + } + + private void waitUntilAfterCheckpoint(String id, long currentCheckpoint) throws Exception { + assertBusy(() -> assertThat(getTransformStats(id).getTransformState().getCheckpoint(), greaterThan(currentCheckpoint)), + 60, TimeUnit.SECONDS); + } + + private void createIndex(String indexName) throws IOException { + // create mapping + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("mappings") + .startObject("properties") + .startObject("timestamp") + .field("type", "date") + .endObject() + .startObject("user_id") + .field("type", "keyword") + .endObject() + .startObject("stars") + .field("type", "integer") + .endObject() + .endObject() + .endObject(); + } + builder.endObject(); + final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); + Request req = new Request("PUT", indexName); + req.setEntity(entity); + client().performRequest(req); + } + } + + private void putData(String indexName, int numDocs, TimeValue fromTime, List entityIds) throws IOException { + long timeStamp = Instant.now().toEpochMilli() - fromTime.getMillis(); + + // create index + final StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + for (String entity : entityIds) { + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n") + .append("{\"user_id\":\"") + .append(entity) + .append("\",\"stars\":") + .append(randomLongBetween(0, 5)) + .append(",\"timestamp\":") + .append(timeStamp) + .append("}\n"); + } + } + bulk.append("\r\n"); + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + entityAsMap(client().performRequest(bulkRequest)); + } +} diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml new file mode 100644 index 0000000000000..9454423e98953 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_data_frame_jobs_crud.yml @@ -0,0 +1,292 @@ +--- +"Test put batch data frame transforms on mixed cluster": + - do: + cluster.health: + index: "dataframe-transform-airline-data" + wait_for_status: green + timeout: 70s + + - do: + data_frame.put_data_frame_transform: + transform_id: "mixed-simple-transform" + body: > + { + "source": { "index": "dataframe-transform-airline-data" }, + "dest": { "index": "mixed-simple-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.put_data_frame_transform: + transform_id: "mixed-complex-transform" + body: > + { + "source": { + "index": "dataframe-transform-airline-data", + "query": { + "bool": { + "filter": {"term": {"airline": "ElasticAir"}} + } + } + }, + "dest": { + "index": "mixed-complex-transform-idx", + "pipeline": "data_frame_simple_pipeline" + }, + "pivot": { + "group_by": { + "airline": {"terms": {"field": "airline"}}, + "day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}}, + "every_50": {"histogram": {"field": "responsetime", "interval": 50}} + }, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + +--- +"Test put continuous data frame transform on mixed cluster": + - do: + cluster.health: + index: "dataframe-transform-airline-data-cont" + wait_for_status: green + timeout: 70s + + - do: + data_frame.put_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + body: > + { + "source": { "index": "dataframe-transform-airline-data-cont" }, + "dest": { "index": "mixed-simple-continuous-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { + "time": { + "field": "time", + "delay": "90m" + } + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + +--- +"Test GET, start, and stop old cluster batch transforms": + - do: + cluster.health: + index: "dataframe-transform-airline-data" + wait_for_status: green + timeout: 70s + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-simple-transform-idx" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-complex-transform-idx" } + - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" } + - match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + +--- +"Test GET, stop, start, old continuous transforms": + - do: + cluster.health: + index: "dataframe-transform-airline-data-cont" + wait_for_status: green + timeout: 70s + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml new file mode 100644 index 0000000000000..7b666c2caa35f --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_data_frame_jobs_crud.yml @@ -0,0 +1,218 @@ +--- +"Test put batch data frame transforms on old cluster": + - do: + indices.create: + index: dataframe-transform-airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + cluster.health: + index: "dataframe-transform-airline-data" + wait_for_status: green + timeout: 70s + + - do: + ingest.put_pipeline: + id: "data_frame_simple_pipeline" + body: > + { + "processors": [ + { + "set" : { + "field" : "my_field", + "value": 42 + } + } + ] + } + - do: + data_frame.put_data_frame_transform: + transform_id: "old-simple-transform" + body: > + { + "source": { "index": "dataframe-transform-airline-data" }, + "dest": { "index": "old-simple-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.put_data_frame_transform: + transform_id: "old-complex-transform" + body: > + { + "source": { + "index": "dataframe-transform-airline-data", + "query": { + "bool": { + "filter": {"term": {"airline": "ElasticAir"}} + } + } + }, + "dest": { + "index": "old-complex-transform-idx", + "pipeline": "data_frame_simple_pipeline" + }, + "pivot": { + "group_by": { + "airline": {"terms": {"field": "airline"}}, + "day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}}, + "every_50": {"histogram": {"field": "responsetime", "interval": 50}} + }, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + +--- +"Test put continuous data frame transform on old cluster": + - do: + indices.create: + index: dataframe-transform-airline-data-cont + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + cluster.health: + index: "dataframe-transform-airline-data-cont" + wait_for_status: green + timeout: 70s + + - do: + data_frame.put_data_frame_transform: + transform_id: "old-simple-continuous-transform" + body: > + { + "source": { "index": "dataframe-transform-airline-data-cont" }, + "dest": { "index": "old-simple-continuous-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "sync": { + "time": { + "field": "time", + "delay": "90m" + } + } + } + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml new file mode 100644 index 0000000000000..ea63950d7fc64 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -0,0 +1,259 @@ +setup: + - do: + cluster.health: + wait_for_status: green + wait_for_nodes: 3 + # wait for long enough that we give delayed unassigned shards to stop being delayed + timeout: 70s +--- +"Get start, stop, and delete old and mixed cluster batch data frame transforms": + # Simple and complex OLD transforms + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-simple-transform-idx" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + - do: + data_frame.get_data_frame_transform: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "old-complex-transform-idx" } + - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" } + - match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + # Simple and complex Mixed cluster transforms + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "mixed-simple-transform-idx" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.source.index.0: "dataframe-transform-airline-data" } + - match: { transforms.0.dest.index: "mixed-complex-transform-idx" } + - match: { transforms.0.dest.pipeline: "data_frame_simple_pipeline" } + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" } + - match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-complex-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.task_state: "/started|stopped/" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-complex-transform" + wait_for_completion: true + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-complex-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-complex-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + +# Delete all old and mixed transforms + - do: + data_frame.delete_data_frame_transform: + transform_id: "old-simple-transform" + + - do: + data_frame.delete_data_frame_transform: + transform_id: "mixed-simple-transform" + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-transform,mixed-simple-transform" + - match: { count: 0 } + +--- +"Test GET, stop, delete, old and mixed continuous transforms": + - do: + data_frame.get_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "old-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "old-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "old-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "old-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "old-simple-continuous-transform" + + - do: + data_frame.get_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - is_true: transforms.0.version + - is_true: transforms.0.create_time + + - do: + data_frame.start_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + - match: { acknowledged: true } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.task_state: "started" } + + - do: + data_frame.stop_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" + wait_for_completion: true + - match: { acknowledged: true } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "mixed-simple-continuous-transform" + - match: { count: 1 } + - match: { transforms.0.id: "mixed-simple-continuous-transform" } + - match: { transforms.0.state.indexer_state: "stopped" } + - match: { transforms.0.state.task_state: "stopped" } + + - do: + data_frame.delete_data_frame_transform: + transform_id: "mixed-simple-continuous-transform" diff --git a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java index bfdf051a29235..5b4a6addc3ec5 100644 --- a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java +++ b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java @@ -30,6 +30,12 @@ public final class XPackRestTestConstants { public static final List ML_POST_V660_TEMPLATES = List.of(AUDITOR_NOTIFICATIONS_INDEX, ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX, CONFIG_INDEX); + // Data Frame constants: + public static final String DATA_FRAME_INTERNAL_INDEX = ".data-frame-internal-1"; + public static final String DATA_FRAME_NOTIFICATIONS_INDEX = ".data-frame-notifications-1"; + + public static final List DATA_FRAME_TEMPLATES = List.of(DATA_FRAME_INTERNAL_INDEX, DATA_FRAME_NOTIFICATIONS_INDEX); + private XPackRestTestConstants() { } }