diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 127425308b615..e2246214315b8 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -113,6 +113,7 @@ public void clusterChanged(ClusterChangedEvent event) { // if this is not a data node, we need to start it ourselves possibly if (event.state().nodes().getLocalNode().isDataNode() == false && isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) { + this.state.set(WatcherState.STARTING); watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED)); return; } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 49915674fe9e2..599287bb50a76 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -183,9 +183,6 @@ void reload(ClusterState state, String reason) { // by checking the cluster state version before and after loading the watches we can potentially just exit without applying the // changes processedClusterStateVersion.set(state.getVersion()); - triggerService.pauseExecution(); - int cancelledTaskCount = executionService.clearExecutionsAndQueue(); - logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> logger.error("error reloading watcher", e))); @@ -221,6 +218,7 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool if (processedClusterStateVersion.get() != state.getVersion()) { logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress", state.getVersion(), processedClusterStateVersion.get()); + return false; } Collection watches = loadWatches(state); @@ -231,7 +229,13 @@ private synchronized boolean reloadInner(ClusterState state, String reason, bool // if we had another state coming in the meantime, we will not start the trigger engines with these watches, but wait // until the others are loaded + // also this is the place where we pause the trigger service execution and clear the current execution service, so that we make sure + // that existing executions finish, but no new ones are executed if (processedClusterStateVersion.get() == state.getVersion()) { + triggerService.pauseExecution(); + int cancelledTaskCount = executionService.clearExecutionsAndQueue(); + logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount); + executionService.unPause(); triggerService.start(watches); if (triggeredWatches.isEmpty() == false) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java index 05aa7cf302817..4c10f794880b9 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleTriggerEngine.java @@ -56,7 +56,7 @@ public synchronized void start(Collection jobs) { schedules.put(job.id(), new ActiveSchedule(job.id(), trigger.getSchedule(), startTime)); } } - this.schedules.putAll(schedules); + this.schedules = schedules; } @Override diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 7949998867b48..6680b38ab94b3 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -35,7 +35,9 @@ import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.daily; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.weekly; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Mockito.mock; @@ -50,8 +52,12 @@ public void init() throws Exception { } private TriggerEngine createEngine() { - return new TickerScheduleTriggerEngine(Settings.EMPTY, - mock(ScheduleRegistry.class), clock); + Settings settings = Settings.EMPTY; + // having a low value here speeds up the tests tremendously, we still want to run with the defaults every now and then + if (usually()) { + settings = Settings.builder().put(TickerScheduleTriggerEngine.TICKER_INTERVAL_SETTING.getKey(), "10ms").build(); + } + return new TickerScheduleTriggerEngine(settings, mock(ScheduleRegistry.class), clock); } private void advanceClockIfNeeded(DateTime newCurrentDateTime) { @@ -104,6 +110,40 @@ public void accept(Iterable events) { assertThat(bits.cardinality(), is(count)); } + public void testStartClearsExistingSchedules() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + List firedWatchIds = new ArrayList<>(); + engine.register(new Consumer>() { + @Override + public void accept(Iterable events) { + for (TriggerEvent event : events) { + firedWatchIds.add(event.jobName()); + } + latch.countDown(); + } + }); + + int count = randomIntBetween(2, 5); + List watches = new ArrayList<>(); + for (int i = 0; i < count; i++) { + watches.add(createWatch(String.valueOf(i), interval("1s"))); + } + engine.start(watches); + + watches.clear(); + for (int i = 0; i < count; i++) { + watches.add(createWatch("another_id" + i, interval("1s"))); + } + engine.start(watches); + + advanceClockIfNeeded(new DateTime(clock.millis(), UTC).plusMillis(1100)); + if (!latch.await(3 * count, TimeUnit.SECONDS)) { + fail("waiting too long for all watches to be triggered"); + } + + assertThat(firedWatchIds, everyItem(startsWith("another_id"))); + } + public void testAddHourly() throws Exception { final String name = "job_name"; final CountDownLatch latch = new CountDownLatch(1);