diff --git a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/history/HistoryTemplateTransformMappingsTests.java b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/history/HistoryTemplateTransformMappingsTests.java index 6efcbd87f0b33..cfca0bde383ad 100644 --- a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/history/HistoryTemplateTransformMappingsTests.java +++ b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/history/HistoryTemplateTransformMappingsTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.watcher.history; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchRequestBuilder; @@ -44,18 +45,25 @@ public void testTransformFields() throws Exception { ) ); - client().prepareBulk() - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .add( - prepareIndex("idx").setId("1").setSource(jsonBuilder().startObject().field("name", "first").field("foo", "bar").endObject()) - ) - .add( - prepareIndex("idx").setId("2") - .setSource( - jsonBuilder().startObject().field("name", "second").startObject("foo").field("what", "ever").endObject().endObject() - ) - ) - .get(); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .add( + prepareIndex("idx").setId("1") + .setSource(jsonBuilder().startObject().field("name", "first").field("foo", "bar").endObject()) + ) + .add( + prepareIndex("idx").setId("2") + .setSource( + jsonBuilder().startObject() + .field("name", "second") + .startObject("foo") + .field("what", "ever") + .endObject() + .endObject() + ) + ) + .get(); + } new PutWatchRequestBuilder(client(), "_first").setSource( watchBuilder().trigger(schedule(interval("5s"))) diff --git a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index 99640d1ebc3ea..e4929ebea74c2 100644 --- a/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -169,29 +169,30 @@ public void testLoadExistingWatchesUponStartup() throws Exception { int numWatches = scaledRandomIntBetween(16, 128); WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index"); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - for (int i = 0; i < numWatches; i++) { - bulkRequestBuilder.add( - prepareIndex(Watch.INDEX).setId("_id" + i) - .setSource( - watchBuilder().trigger(schedule(cron("0 0/5 * * * ? 2050"))) - .input(searchInput(request)) - .condition(new CompareCondition("ctx.payload.hits.total.value", CompareCondition.Op.EQ, 1L)) - .buildAsBytes(XContentType.JSON), - XContentType.JSON - ) - .setWaitForActiveShards(ActiveShardCount.ALL) - ); - } - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertHitCount(prepareSearch(Watch.INDEX).setSize(0), numWatches); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + for (int i = 0; i < numWatches; i++) { + bulkRequestBuilder.add( + prepareIndex(Watch.INDEX).setId("_id" + i) + .setSource( + watchBuilder().trigger(schedule(cron("0 0/5 * * * ? 2050"))) + .input(searchInput(request)) + .condition(new CompareCondition("ctx.payload.hits.total.value", CompareCondition.Op.EQ, 1L)) + .buildAsBytes(XContentType.JSON), + XContentType.JSON + ) + .setWaitForActiveShards(ActiveShardCount.ALL) + ); + } + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertHitCount(prepareSearch(Watch.INDEX).setSize(0), numWatches); - startWatcher(); + startWatcher(); - assertBusy(() -> { - WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).get(); - assertThat(response.getWatchesCount(), equalTo((long) numWatches)); - }); + assertBusy(() -> { + WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).get(); + assertThat(response.getWatchesCount(), equalTo((long) numWatches)); + }); + } } public void testMixedTriggeredWatchLoading() throws Exception { @@ -222,24 +223,25 @@ public void testMixedTriggeredWatchLoading() throws Exception { ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); final int numRecords = scaledRandomIntBetween(numWatches, 128); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - for (int i = 0; i < numRecords; i++) { - String watchId = "_id" + (i % numWatches); - now = now.plusMinutes(1); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); - Wid wid = new Wid(watchId, now); - TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); - bulkRequestBuilder.add( - prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value()) - .setSource(jsonBuilder().value(triggeredWatch)) - .request() - ); - } - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + for (int i = 0; i < numRecords; i++) { + String watchId = "_id" + (i % numWatches); + now = now.plusMinutes(1); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); + Wid wid = new Wid(watchId, now); + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); + bulkRequestBuilder.add( + prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value()) + .setSource(jsonBuilder().value(triggeredWatch)) + .request() + ); + } + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - logger.info("Added [{}] triggered watches for [{}] different watches, starting watcher again", numRecords, numWatches); - startWatcher(); - assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords); + logger.info("Added [{}] triggered watches for [{}] different watches, starting watcher again", numRecords, numWatches); + startWatcher(); + assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords); + } } public void testTriggeredWatchLoading() throws Exception { @@ -266,23 +268,24 @@ public void testTriggeredWatchLoading() throws Exception { ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); final int numRecords = scaledRandomIntBetween(2, 12); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - for (int i = 0; i < numRecords; i++) { - now = now.plusMinutes(1); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); - Wid wid = new Wid(watchId, now); - TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); - bulkRequestBuilder.add( - prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value()) - .setSource(jsonBuilder().value(triggeredWatch)) - .setWaitForActiveShards(ActiveShardCount.ALL) - ); - } - bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + for (int i = 0; i < numRecords; i++) { + now = now.plusMinutes(1); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now); + Wid wid = new Wid(watchId, now); + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); + bulkRequestBuilder.add( + prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value()) + .setSource(jsonBuilder().value(triggeredWatch)) + .setWaitForActiveShards(ActiveShardCount.ALL) + ); + } + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - startWatcher(); + startWatcher(); - assertSingleExecutionAndCompleteWatchHistory(1, numRecords); + assertSingleExecutionAndCompleteWatchHistory(1, numRecords); + } } private void assertSingleExecutionAndCompleteWatchHistory(final long numberOfWatches, final int expectedWatchHistoryCount) @@ -342,47 +345,48 @@ public void testWatchRecordSavedTwice() throws Exception { logger.info("Stopping watcher"); stopWatcher(); - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - int numRecords = scaledRandomIntBetween(8, 32); - for (int i = 0; i < numRecords; i++) { - String watchId = Integer.toString(i); - ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, triggeredTime, triggeredTime); - Wid wid = new Wid(watchId, triggeredTime); - TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); - bulkRequestBuilder.add( - prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value()) - .setSource(jsonBuilder().value(triggeredWatch)) - ); - - String id = internalCluster().getInstance(ClusterService.class).localNode().getId(); - WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, "executed", id); - bulkRequestBuilder.add( - prepareIndex(HistoryStoreField.DATA_STREAM).setId(watchRecord.id().value()) - .setOpType(DocWriteRequest.OpType.CREATE) - .setSource(jsonBuilder().value(watchRecord)) - ); - } - assertNoFailures(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get()); - - logger.info("Starting watcher"); - startWatcher(); - - assertBusy(() -> { - // We need to wait until all the records are processed from the internal execution queue, only then we can assert - // that numRecords watch records have been processed as part of starting up. - WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).setIncludeCurrentWatches(true).get(); - long maxSize = response.getNodes().stream().map(WatcherStatsResponse.Node::getSnapshots).mapToLong(List::size).sum(); - assertThat(maxSize, equalTo(0L)); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + int numRecords = scaledRandomIntBetween(8, 32); + for (int i = 0; i < numRecords; i++) { + String watchId = Integer.toString(i); + ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, triggeredTime, triggeredTime); + Wid wid = new Wid(watchId, triggeredTime); + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event); + bulkRequestBuilder.add( + prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value()) + .setSource(jsonBuilder().value(triggeredWatch)) + ); - // but even then since the execution of the watch record is async it may take a little bit before - // the actual documents are in the output index - refresh(); - assertResponse(prepareSearch(HistoryStoreField.DATA_STREAM).setSize(numRecords), searchResponse -> { - assertThat(searchResponse.getHits().getTotalHits().value, Matchers.equalTo((long) numRecords)); - for (int i = 0; i < numRecords; i++) { - assertThat(searchResponse.getHits().getAt(i).getSourceAsMap().get("state"), is(ExecutionState.EXECUTED.id())); - } + String id = internalCluster().getInstance(ClusterService.class).localNode().getId(); + WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, "executed", id); + bulkRequestBuilder.add( + prepareIndex(HistoryStoreField.DATA_STREAM).setId(watchRecord.id().value()) + .setOpType(DocWriteRequest.OpType.CREATE) + .setSource(jsonBuilder().value(watchRecord)) + ); + } + assertNoFailures(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get()); + + logger.info("Starting watcher"); + startWatcher(); + + assertBusy(() -> { + // We need to wait until all the records are processed from the internal execution queue, only then we can assert + // that numRecords watch records have been processed as part of starting up. + WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).setIncludeCurrentWatches(true).get(); + long maxSize = response.getNodes().stream().map(WatcherStatsResponse.Node::getSnapshots).mapToLong(List::size).sum(); + assertThat(maxSize, equalTo(0L)); + + // but even then since the execution of the watch record is async it may take a little bit before + // the actual documents are in the output index + refresh(); + assertResponse(prepareSearch(HistoryStoreField.DATA_STREAM).setSize(numRecords), searchResponse -> { + assertThat(searchResponse.getHits().getTotalHits().value, Matchers.equalTo((long) numRecords)); + for (int i = 0; i < numRecords; i++) { + assertThat(searchResponse.getHits().getAt(i).getSourceAsMap().get("state"), is(ExecutionState.EXECUTED.id())); + } + }); }); - }); + } } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index d2b38f4b11ef8..0515f38cb96a1 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -79,7 +79,8 @@ public void putAll(final List triggeredWatches, final ActionList return; } - client.bulk(createBulkRequest(triggeredWatches), listener); + BulkRequest bulkRequest = createBulkRequest(triggeredWatches); + client.bulk(bulkRequest, ActionListener.releaseAfter(listener, bulkRequest)); } public BulkResponse putAll(final List triggeredWatches) throws IOException {