From ba6950e7fcb7c4f3a0f8b3e9308803a2e18b51db Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 3 Sep 2018 18:09:22 +0200 Subject: [PATCH] Watcher: Ensure that execution triggers properly on initial setup This commit reverts most of #33157 as it introduces another race condition and breaks a common case of watcher, when the first watch is added to the system and the index does not exist yet. This means, that the index will be created, which triggers a reload, but during this time the put watch operation that triggered this is not yet indexed, so that both processes finish roughly add the same time and should not overwrite each other but act complementary. This commit reverts the logic of cleaning out the ticker engine watches on start up, as this is done already when the execution is paused - which also gets paused on the cluster state listener again, as we can be sure here, that the watches index has not yet been created. This also adds a new test, that starts a one node cluster and emulates the case of a non existing watches index and a watch being added, which should result in proper execution. Closes #33320 --- .../test/rest/ESRestTestCase.java | 2 +- .../xpack/watcher/WatcherService.java | 23 +++--- .../engine/TickerScheduleTriggerEngine.java | 14 +++- .../xpack/watcher/WatcherServiceTests.java | 74 +++++++++++-------- .../test/integration/SingleNodeTests.java | 66 +++++++++++++++++ .../engine/TickerScheduleEngineTests.java | 36 --------- .../SmokeTestWatcherWithSecurityIT.java | 36 ++++----- 7 files changed, 147 insertions(+), 104 deletions(-) create mode 100644 x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index ecb965040f87b..35b7c58a18eb6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -621,7 +621,7 @@ private static boolean isXPackTemplate(String name) { if (name.startsWith(".monitoring-")) { return true; } - if (name.startsWith(".watch-history-")) { + if (name.startsWith(".watch") || name.startsWith(".triggered_watches")) { return true; } if (name.startsWith(".ml-")) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 599287bb50a76..75fd13915de3e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -27,12 +27,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.upgrade.UpgradeField; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; @@ -63,7 +63,6 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState; import static org.elasticsearch.xpack.core.watcher.watch.Watch.INDEX; @@ -92,7 +91,7 @@ public class WatcherService extends AbstractComponent { this.scrollSize = settings.getAsInt("xpack.watcher.watch.scroll.size", 100); this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30)); this.parser = parser; - this.client = client; + this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN); this.executor = executor; } @@ -184,6 +183,10 @@ void reload(ClusterState state, String reason) { // changes processedClusterStateVersion.set(state.getVersion()); + triggerService.pauseExecution(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); + executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> logger.error("error reloading watcher", e))); } @@ -232,10 +235,6 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool // also this is the place where we pause the trigger service execution and clear the current execution service, so that we make sure // that existing executions finish, but no new ones are executed if (processedClusterStateVersion.get() == state.getVersion()) { - triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(); - logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); - executionService.unPause(); triggerService.start(watches); if (triggeredWatches.isEmpty() == false) { @@ -273,7 +272,7 @@ private Collection loadWatches(ClusterState clusterState) { SearchResponse response = null; List watches = new ArrayList<>(); - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + try { RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(INDEX)) .actionGet(TimeValue.timeValueSeconds(5)); if (refreshResponse.getSuccessfulShards() < indexMetaData.getNumberOfShards()) { @@ -357,11 +356,9 @@ private Collection loadWatches(ClusterState clusterState) { } } finally { if (response != null) { - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(response.getScrollId()); - client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); - } + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index 4c10f794880b9..bd0204766aff4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -22,6 +22,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -49,14 +50,23 @@ public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleR @Override public synchronized void start(Collection jobs) { long startTime = clock.millis(); - Map schedules = new ConcurrentHashMap<>(); + Map schedules = new HashMap<>(jobs.size()); for (Watch job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime)); } } - this.schedules = schedules; + // why are we calling putAll() here instead of assigning a brand + // new concurrent hash map you may ask yourself over here + // This requires some explanation how TriggerEngine.start() is + // invoked, when a reload due to the cluster state listener is done + // If the watches index does not exist, and new document is stored, + // then the creation of that index will trigger a reload which calls + // this method. The index operation however will run at the same time + // as the reload, so if we clean out the old data structure here, + // that can lead to that one watch not being triggered + this.schedules.putAll(schedules); } @Override diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 73f9271e3efda..f1c711ae00a93 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -6,19 +6,21 @@ package org.elasticsearch.xpack.watcher; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -42,7 +44,6 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.watcher.trigger.Trigger; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; @@ -55,6 +56,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.junit.Before; import org.mockito.ArgumentCaptor; import java.util.Collections; @@ -67,6 +69,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -76,6 +79,16 @@ public class WatcherServiceTests extends ESTestCase { private final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + private final Client client = mock(Client.class); + + @Before + public void configureMockClient() { + when(client.settings()).thenReturn(Settings.EMPTY); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + } + public void testValidateStartWithClosedIndex() { TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); @@ -83,7 +96,7 @@ public void testValidateStartWithClosedIndex() { WatchParser parser = mock(WatchParser.class); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore, - executionService, parser, mock(Client.class), executorService) { + executionService, parser, client, executorService) { @Override void stopExecutor() { } @@ -102,18 +115,11 @@ void stopExecutor() { } public void testLoadOnlyActiveWatches() throws Exception { - // this is just, so we dont have to add any mocking to the threadpool - Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); - TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); ExecutionService executionService = mock(ExecutionService.class); WatchParser parser = mock(WatchParser.class); - Client client = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - WatcherService service = new WatcherService(settings, triggerService, triggeredWatchStore, + WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore, executionService, parser, client, executorService) { @Override void stopExecutor() { @@ -150,21 +156,21 @@ void stopExecutor() { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getSuccessfulShards()) .thenReturn(clusterState.getMetaData().getIndices().get(Watch.INDEX).getNumberOfShards()); - AdminClient adminClient = mock(AdminClient.class); - IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); - when(client.admin()).thenReturn(adminClient); - when(adminClient.indices()).thenReturn(indicesAdminClient); - PlainActionFuture refreshFuture = new PlainActionFuture<>(); - when(indicesAdminClient.refresh(any(RefreshRequest.class))).thenReturn(refreshFuture); - refreshFuture.onResponse(refreshResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(refreshResponse); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(RefreshRequest.class), any(ActionListener.class)); // empty scroll response, no further scrolling needed SearchResponseSections scrollSearchSections = new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 1); SearchResponse scrollSearchResponse = new SearchResponse(scrollSearchSections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); - PlainActionFuture searchScrollResponseFuture = new PlainActionFuture<>(); - when(client.searchScroll(any(SearchScrollRequest.class))).thenReturn(searchScrollResponseFuture); - searchScrollResponseFuture.onResponse(scrollSearchResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(scrollSearchResponse); + return null; + }).when(client).execute(eq(SearchScrollAction.INSTANCE), any(SearchScrollRequest.class), any(ActionListener.class)); // one search response containing active and inactive watches int count = randomIntBetween(2, 200); @@ -192,13 +198,17 @@ void stopExecutor() { SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1); SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); - PlainActionFuture searchResponseFuture = new PlainActionFuture<>(); - when(client.search(any(SearchRequest.class))).thenReturn(searchResponseFuture); - searchResponseFuture.onResponse(searchResponse); - - PlainActionFuture clearScrollFuture = new PlainActionFuture<>(); - when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture); - clearScrollFuture.onResponse(new ClearScrollResponse(true, 1)); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(searchResponse); + return null; + }).when(client).execute(eq(SearchAction.INSTANCE), any(SearchRequest.class), any(ActionListener.class)); + + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new ClearScrollResponse(true, 1)); + return null; + }).when(client).execute(eq(ClearScrollAction.INSTANCE), any(ClearScrollRequest.class), any(ActionListener.class)); service.start(clusterState, () -> {}); @@ -228,7 +238,7 @@ public void testPausingWatcherServiceAlsoPausesTriggerService() { assertThat(triggerService.count(), is(1L)); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), - mock(ExecutionService.class), mock(WatchParser.class), mock(Client.class), executorService) { + mock(ExecutionService.class), mock(WatchParser.class), client, executorService) { @Override void stopExecutor() { } @@ -245,7 +255,7 @@ public void testReloadingWatcherDoesNotPauseExecutionService() { ExecutionService executionService = mock(ExecutionService.class); TriggerService triggerService = mock(TriggerService.class); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), - executionService, mock(WatchParser.class), mock(Client.class), executorService) { + executionService, mock(WatchParser.class), client, executorService) { @Override void stopExecutor() { } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java new file mode 100644 index 0000000000000..2109f2a2d95cc --- /dev/null +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.test.integration; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; +import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, supportsDedicatedMasters = false) +public class SingleNodeTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + return false; + } + + // this is the standard setup when starting watcher in a regular cluster + // the index does not exist, a watch gets added + // the watch should be executed properly, despite the index being created and the cluster state listener being reloaded + public void testThatLoadingWithNonExistingIndexWorks() throws Exception { + stopWatcher(); + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); + IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, clusterStateResponse.getState().metaData()); + String watchIndexName = metaData.getIndex().getName(); + assertAcked(client().admin().indices().prepareDelete(watchIndexName)); + startWatcher(); + + String watchId = randomAlphaOfLength(20); + // now we start with an empty set up, store a watch and expected it to be executed + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watchId) + .setSource(watchBuilder() + .trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS))) + .input(simpleInput()) + .addAction("_logger", loggingAction("logging of watch _name"))) + .get(); + assertThat(putWatchResponse.isCreated(), is(true)); + + assertBusy(() -> { + client().admin().indices().prepareRefresh(".watcher-history*"); + SearchResponse searchResponse = client().prepareSearch(".watcher-history*").setSize(0).get(); + assertThat(searchResponse.getHits().getTotalHits(), is(greaterThanOrEqualTo(1L))); + }, 5, TimeUnit.SECONDS); + } + +} diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 6680b38ab94b3..db1d3767b59cd 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -35,9 +35,7 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; -import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; @@ -110,40 +108,6 @@ public void accept(Iterable events) { assertThat(bits.cardinality(), is(count)); } - public void testStartClearsExistingSchedules() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - List firedWatchIds = new ArrayList<>(); - engine.register(new Consumer>() { - @Override - public void accept(Iterable events) { - for (TriggerEvent event : events) { - firedWatchIds.add(event.jobName()); - } - latch.countDown(); - } - }); - - int count = randomIntBetween(2, 5); - List watches = new ArrayList<>(); - for (int i = 0; i < count; i++) { - watches.add(createWatch(String.valueOf(i), interval("1s"))); - } - engine.start(watches); - - watches.clear(); - for (int i = 0; i < count; i++) { - watches.add(createWatch("another_id" + i, interval("1s"))); - } - engine.start(watches); - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!latch.await(3 * count, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - assertThat(firedWatchIds, everyItem(startsWith("another_id"))); - } - public void testAddHourly() throws Exception { final String name = "job_name"; final CountDownLatch latch = new CountDownLatch(1); diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java index 17fbf0769fd47..25b19aeea3ba5 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java @@ -59,20 +59,20 @@ public void startWatcher() throws Exception { String state = objectPath.evaluate("stats.0.watcher_state"); switch (state) { - case "stopped": - Response startResponse = adminClient().performRequest(new Request("POST", "/_xpack/watcher/_start")); - String body = EntityUtils.toString(startResponse.getEntity()); - assertThat(body, containsString("\"acknowledged\":true")); - break; - case "stopping": - throw new AssertionError("waiting until stopping state reached stopped state to start again"); - case "starting": - throw new AssertionError("waiting until starting state reached started state"); - case "started": - // all good here, we are done - break; - default: - throw new AssertionError("unknown state[" + state + "]"); + case "stopped": + Response startResponse = adminClient().performRequest(new Request("POST", "/_xpack/watcher/_start")); + Map responseMap = entityAsMap(startResponse); + assertThat(responseMap, hasEntry("acknowledged", true)); + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state to start again"); + case "starting": + throw new AssertionError("waiting until starting state reached started state"); + case "started": + // all good here, we are done + break; + default: + throw new AssertionError("unknown state[" + state + "]"); } } catch (IOException e) { throw new AssertionError(e); @@ -135,7 +135,6 @@ protected Settings restAdminSettings() { } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testSearchInputHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -159,7 +158,6 @@ public void testSearchInputHasPermissions() throws Exception { assertThat(conditionMet, is(true)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29893") public void testSearchInputWithInsufficientPrivileges() throws Exception { String indexName = "index_not_allowed_to_read"; try (XContentBuilder builder = jsonBuilder()) { @@ -186,7 +184,6 @@ public void testSearchInputWithInsufficientPrivileges() throws Exception { assertThat(conditionMet, is(false)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testSearchTransformHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -216,7 +213,6 @@ public void testSearchTransformHasPermissions() throws Exception { assertThat(value, is("15")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33291") public void testSearchTransformInsufficientPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -244,7 +240,6 @@ public void testSearchTransformInsufficientPermissions() throws Exception { assertThat(response.getStatusLine().getStatusCode(), is(404)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30777") public void testIndexActionHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -269,7 +264,6 @@ public void testIndexActionHasPermissions() throws Exception { assertThat(spam, is("eggs")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testIndexActionInsufficientPrivileges() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -299,6 +293,8 @@ private void indexWatch(String watchId, XContentBuilder builder) throws Exceptio Response response = client().performRequest(request); Map responseMap = entityAsMap(response); assertThat(responseMap, hasEntry("_id", watchId)); + assertThat(responseMap, hasEntry("created", true)); + assertThat(responseMap, hasEntry("_version", 1)); } private ObjectPath getWatchHistoryEntry(String watchId) throws Exception {