diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index ecb965040f87b..35b7c58a18eb6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -621,7 +621,7 @@ private static boolean isXPackTemplate(String name) { if (name.startsWith(".monitoring-")) { return true; } - if (name.startsWith(".watch-history-")) { + if (name.startsWith(".watch") || name.startsWith(".triggered_watches")) { return true; } if (name.startsWith(".ml-")) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 599287bb50a76..75fd13915de3e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -27,12 +27,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.upgrade.UpgradeField; import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.watch.Watch; @@ -63,7 +63,6 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalState; import static org.elasticsearch.xpack.core.watcher.watch.Watch.INDEX; @@ -92,7 +91,7 @@ public class WatcherService extends AbstractComponent { this.scrollSize = settings.getAsInt("xpack.watcher.watch.scroll.size", 100); this.defaultSearchTimeout = settings.getAsTime("xpack.watcher.internal.ops.search.default_timeout", TimeValue.timeValueSeconds(30)); this.parser = parser; - this.client = client; + this.client = ClientHelper.clientWithOrigin(client, WATCHER_ORIGIN); this.executor = executor; } @@ -184,6 +183,10 @@ void reload(ClusterState state, String reason) { // changes processedClusterStateVersion.set(state.getVersion()); + triggerService.pauseExecution(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); + executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> logger.error("error reloading watcher", e))); } @@ -232,10 +235,6 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool // also this is the place where we pause the trigger service execution and clear the current execution service, so that we make sure // that existing executions finish, but no new ones are executed if (processedClusterStateVersion.get() == state.getVersion()) { - triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(); - logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); - executionService.unPause(); triggerService.start(watches); if (triggeredWatches.isEmpty() == false) { @@ -273,7 +272,7 @@ private Collection loadWatches(ClusterState clusterState) { SearchResponse response = null; List watches = new ArrayList<>(); - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + try { RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(INDEX)) .actionGet(TimeValue.timeValueSeconds(5)); if (refreshResponse.getSuccessfulShards() < indexMetaData.getNumberOfShards()) { @@ -357,11 +356,9 @@ private Collection loadWatches(ClusterState clusterState) { } } finally { if (response != null) { - try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(response.getScrollId()); - client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); - } + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(response.getScrollId()); + client.clearScroll(clearScrollRequest).actionGet(scrollTimeout); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index 4c10f794880b9..bd0204766aff4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -22,6 +22,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -49,14 +50,23 @@ public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleR @Override public synchronized void start(Collection jobs) { long startTime = clock.millis(); - Map schedules = new ConcurrentHashMap<>(); + Map schedules = new HashMap<>(jobs.size()); for (Watch job : jobs) { if (job.trigger() instanceof ScheduleTrigger) { ScheduleTrigger trigger = (ScheduleTrigger) job.trigger(); schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime)); } } - this.schedules = schedules; + // why are we calling putAll() here instead of assigning a brand + // new concurrent hash map you may ask yourself over here + // This requires some explanation how TriggerEngine.start() is + // invoked, when a reload due to the cluster state listener is done + // If the watches index does not exist, and new document is stored, + // then the creation of that index will trigger a reload which calls + // this method. The index operation however will run at the same time + // as the reload, so if we clean out the old data structure here, + // that can lead to that one watch not being triggered + this.schedules.putAll(schedules); } @Override diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 73f9271e3efda..f1c711ae00a93 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -6,19 +6,21 @@ package org.elasticsearch.xpack.watcher; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.search.ClearScrollAction; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; -import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -42,7 +44,6 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.watcher.trigger.Trigger; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; @@ -55,6 +56,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.junit.Before; import org.mockito.ArgumentCaptor; import java.util.Collections; @@ -67,6 +69,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -76,6 +79,16 @@ public class WatcherServiceTests extends ESTestCase { private final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + private final Client client = mock(Client.class); + + @Before + public void configureMockClient() { + when(client.settings()).thenReturn(Settings.EMPTY); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + } + public void testValidateStartWithClosedIndex() { TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); @@ -83,7 +96,7 @@ public void testValidateStartWithClosedIndex() { WatchParser parser = mock(WatchParser.class); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore, - executionService, parser, mock(Client.class), executorService) { + executionService, parser, client, executorService) { @Override void stopExecutor() { } @@ -102,18 +115,11 @@ void stopExecutor() { } public void testLoadOnlyActiveWatches() throws Exception { - // this is just, so we dont have to add any mocking to the threadpool - Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); - TriggerService triggerService = mock(TriggerService.class); TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class); ExecutionService executionService = mock(ExecutionService.class); WatchParser parser = mock(WatchParser.class); - Client client = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - WatcherService service = new WatcherService(settings, triggerService, triggeredWatchStore, + WatcherService service = new WatcherService(Settings.EMPTY, triggerService, triggeredWatchStore, executionService, parser, client, executorService) { @Override void stopExecutor() { @@ -150,21 +156,21 @@ void stopExecutor() { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getSuccessfulShards()) .thenReturn(clusterState.getMetaData().getIndices().get(Watch.INDEX).getNumberOfShards()); - AdminClient adminClient = mock(AdminClient.class); - IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); - when(client.admin()).thenReturn(adminClient); - when(adminClient.indices()).thenReturn(indicesAdminClient); - PlainActionFuture refreshFuture = new PlainActionFuture<>(); - when(indicesAdminClient.refresh(any(RefreshRequest.class))).thenReturn(refreshFuture); - refreshFuture.onResponse(refreshResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(refreshResponse); + return null; + }).when(client).execute(eq(RefreshAction.INSTANCE), any(RefreshRequest.class), any(ActionListener.class)); // empty scroll response, no further scrolling needed SearchResponseSections scrollSearchSections = new SearchResponseSections(SearchHits.empty(), null, null, false, false, null, 1); SearchResponse scrollSearchResponse = new SearchResponse(scrollSearchSections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); - PlainActionFuture searchScrollResponseFuture = new PlainActionFuture<>(); - when(client.searchScroll(any(SearchScrollRequest.class))).thenReturn(searchScrollResponseFuture); - searchScrollResponseFuture.onResponse(scrollSearchResponse); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(scrollSearchResponse); + return null; + }).when(client).execute(eq(SearchScrollAction.INSTANCE), any(SearchScrollRequest.class), any(ActionListener.class)); // one search response containing active and inactive watches int count = randomIntBetween(2, 200); @@ -192,13 +198,17 @@ void stopExecutor() { SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1); SearchResponse searchResponse = new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); - PlainActionFuture searchResponseFuture = new PlainActionFuture<>(); - when(client.search(any(SearchRequest.class))).thenReturn(searchResponseFuture); - searchResponseFuture.onResponse(searchResponse); - - PlainActionFuture clearScrollFuture = new PlainActionFuture<>(); - when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture); - clearScrollFuture.onResponse(new ClearScrollResponse(true, 1)); + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(searchResponse); + return null; + }).when(client).execute(eq(SearchAction.INSTANCE), any(SearchRequest.class), any(ActionListener.class)); + + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new ClearScrollResponse(true, 1)); + return null; + }).when(client).execute(eq(ClearScrollAction.INSTANCE), any(ClearScrollRequest.class), any(ActionListener.class)); service.start(clusterState, () -> {}); @@ -228,7 +238,7 @@ public void testPausingWatcherServiceAlsoPausesTriggerService() { assertThat(triggerService.count(), is(1L)); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), - mock(ExecutionService.class), mock(WatchParser.class), mock(Client.class), executorService) { + mock(ExecutionService.class), mock(WatchParser.class), client, executorService) { @Override void stopExecutor() { } @@ -245,7 +255,7 @@ public void testReloadingWatcherDoesNotPauseExecutionService() { ExecutionService executionService = mock(ExecutionService.class); TriggerService triggerService = mock(TriggerService.class); WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class), - executionService, mock(WatchParser.class), mock(Client.class), executorService) { + executionService, mock(WatchParser.class), client, executorService) { @Override void stopExecutor() { } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java new file mode 100644 index 0000000000000..2109f2a2d95cc --- /dev/null +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/SingleNodeTests.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.test.integration; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; +import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; + +@ClusterScope(scope = SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, supportsDedicatedMasters = false) +public class SingleNodeTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + return false; + } + + // this is the standard setup when starting watcher in a regular cluster + // the index does not exist, a watch gets added + // the watch should be executed properly, despite the index being created and the cluster state listener being reloaded + public void testThatLoadingWithNonExistingIndexWorks() throws Exception { + stopWatcher(); + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); + IndexMetaData metaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, clusterStateResponse.getState().metaData()); + String watchIndexName = metaData.getIndex().getName(); + assertAcked(client().admin().indices().prepareDelete(watchIndexName)); + startWatcher(); + + String watchId = randomAlphaOfLength(20); + // now we start with an empty set up, store a watch and expected it to be executed + PutWatchResponse putWatchResponse = watcherClient().preparePutWatch(watchId) + .setSource(watchBuilder() + .trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS))) + .input(simpleInput()) + .addAction("_logger", loggingAction("logging of watch _name"))) + .get(); + assertThat(putWatchResponse.isCreated(), is(true)); + + assertBusy(() -> { + client().admin().indices().prepareRefresh(".watcher-history*"); + SearchResponse searchResponse = client().prepareSearch(".watcher-history*").setSize(0).get(); + assertThat(searchResponse.getHits().getTotalHits(), is(greaterThanOrEqualTo(1L))); + }, 5, TimeUnit.SECONDS); + } + +} diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 6680b38ab94b3..db1d3767b59cd 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -35,9 +35,7 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; -import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; @@ -110,40 +108,6 @@ public void accept(Iterable events) { assertThat(bits.cardinality(), is(count)); } - public void testStartClearsExistingSchedules() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - List firedWatchIds = new ArrayList<>(); - engine.register(new Consumer>() { - @Override - public void accept(Iterable events) { - for (TriggerEvent event : events) { - firedWatchIds.add(event.jobName()); - } - latch.countDown(); - } - }); - - int count = randomIntBetween(2, 5); - List watches = new ArrayList<>(); - for (int i = 0; i < count; i++) { - watches.add(createWatch(String.valueOf(i), interval("1s"))); - } - engine.start(watches); - - watches.clear(); - for (int i = 0; i < count; i++) { - watches.add(createWatch("another_id" + i, interval("1s"))); - } - engine.start(watches); - - advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); - if (!latch.await(3 * count, TimeUnit.SECONDS)) { - fail("waiting too long for all watches to be triggered"); - } - - assertThat(firedWatchIds, everyItem(startsWith("another_id"))); - } - public void testAddHourly() throws Exception { final String name = "job_name"; final CountDownLatch latch = new CountDownLatch(1); diff --git a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java index 17fbf0769fd47..25b19aeea3ba5 100644 --- a/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java +++ b/x-pack/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityIT.java @@ -59,20 +59,20 @@ public void startWatcher() throws Exception { String state = objectPath.evaluate("stats.0.watcher_state"); switch (state) { - case "stopped": - Response startResponse = adminClient().performRequest(new Request("POST", "/_xpack/watcher/_start")); - String body = EntityUtils.toString(startResponse.getEntity()); - assertThat(body, containsString("\"acknowledged\":true")); - break; - case "stopping": - throw new AssertionError("waiting until stopping state reached stopped state to start again"); - case "starting": - throw new AssertionError("waiting until starting state reached started state"); - case "started": - // all good here, we are done - break; - default: - throw new AssertionError("unknown state[" + state + "]"); + case "stopped": + Response startResponse = adminClient().performRequest(new Request("POST", "/_xpack/watcher/_start")); + Map responseMap = entityAsMap(startResponse); + assertThat(responseMap, hasEntry("acknowledged", true)); + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state to start again"); + case "starting": + throw new AssertionError("waiting until starting state reached started state"); + case "started": + // all good here, we are done + break; + default: + throw new AssertionError("unknown state[" + state + "]"); } } catch (IOException e) { throw new AssertionError(e); @@ -135,7 +135,6 @@ protected Settings restAdminSettings() { } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testSearchInputHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -159,7 +158,6 @@ public void testSearchInputHasPermissions() throws Exception { assertThat(conditionMet, is(true)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29893") public void testSearchInputWithInsufficientPrivileges() throws Exception { String indexName = "index_not_allowed_to_read"; try (XContentBuilder builder = jsonBuilder()) { @@ -186,7 +184,6 @@ public void testSearchInputWithInsufficientPrivileges() throws Exception { assertThat(conditionMet, is(false)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testSearchTransformHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -216,7 +213,6 @@ public void testSearchTransformHasPermissions() throws Exception { assertThat(value, is("15")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33291") public void testSearchTransformInsufficientPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -244,7 +240,6 @@ public void testSearchTransformInsufficientPermissions() throws Exception { assertThat(response.getStatusLine().getStatusCode(), is(404)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30777") public void testIndexActionHasPermissions() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -269,7 +264,6 @@ public void testIndexActionHasPermissions() throws Exception { assertThat(spam, is("eggs")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33320") public void testIndexActionInsufficientPrivileges() throws Exception { try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); @@ -299,6 +293,8 @@ private void indexWatch(String watchId, XContentBuilder builder) throws Exceptio Response response = client().performRequest(request); Map responseMap = entityAsMap(response); assertThat(responseMap, hasEntry("_id", watchId)); + assertThat(responseMap, hasEntry("created", true)); + assertThat(responseMap, hasEntry("_version", 1)); } private ObjectPath getWatchHistoryEntry(String watchId) throws Exception {