Skip to content

Commit

Permalink
fixing watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 4, 2023
1 parent 67f6647 commit ce71b8f
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.watcher.history;

import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchRequestBuilder;
Expand Down Expand Up @@ -44,18 +45,25 @@ public void testTransformFields() throws Exception {
)
);

client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(
prepareIndex("idx").setId("1").setSource(jsonBuilder().startObject().field("name", "first").field("foo", "bar").endObject())
)
.add(
prepareIndex("idx").setId("2")
.setSource(
jsonBuilder().startObject().field("name", "second").startObject("foo").field("what", "ever").endObject().endObject()
)
)
.get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(
prepareIndex("idx").setId("1")
.setSource(jsonBuilder().startObject().field("name", "first").field("foo", "bar").endObject())
)
.add(
prepareIndex("idx").setId("2")
.setSource(
jsonBuilder().startObject()
.field("name", "second")
.startObject("foo")
.field("what", "ever")
.endObject()
.endObject()
)
)
.get();
}

new PutWatchRequestBuilder(client(), "_first").setSource(
watchBuilder().trigger(schedule(interval("5s")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,29 +169,30 @@ public void testLoadExistingWatchesUponStartup() throws Exception {
int numWatches = scaledRandomIntBetween(16, 128);
WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "value")), "my-index");

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numWatches; i++) {
bulkRequestBuilder.add(
prepareIndex(Watch.INDEX).setId("_id" + i)
.setSource(
watchBuilder().trigger(schedule(cron("0 0/5 * * * ? 2050")))
.input(searchInput(request))
.condition(new CompareCondition("ctx.payload.hits.total.value", CompareCondition.Op.EQ, 1L))
.buildAsBytes(XContentType.JSON),
XContentType.JSON
)
.setWaitForActiveShards(ActiveShardCount.ALL)
);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertHitCount(prepareSearch(Watch.INDEX).setSize(0), numWatches);
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
for (int i = 0; i < numWatches; i++) {
bulkRequestBuilder.add(
prepareIndex(Watch.INDEX).setId("_id" + i)
.setSource(
watchBuilder().trigger(schedule(cron("0 0/5 * * * ? 2050")))
.input(searchInput(request))
.condition(new CompareCondition("ctx.payload.hits.total.value", CompareCondition.Op.EQ, 1L))
.buildAsBytes(XContentType.JSON),
XContentType.JSON
)
.setWaitForActiveShards(ActiveShardCount.ALL)
);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
assertHitCount(prepareSearch(Watch.INDEX).setSize(0), numWatches);

startWatcher();
startWatcher();

assertBusy(() -> {
WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).get();
assertThat(response.getWatchesCount(), equalTo((long) numWatches));
});
assertBusy(() -> {
WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).get();
assertThat(response.getWatchesCount(), equalTo((long) numWatches));
});
}
}

public void testMixedTriggeredWatchLoading() throws Exception {
Expand Down Expand Up @@ -222,24 +223,25 @@ public void testMixedTriggeredWatchLoading() throws Exception {

ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
final int numRecords = scaledRandomIntBetween(numWatches, 128);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numRecords; i++) {
String watchId = "_id" + (i % numWatches);
now = now.plusMinutes(1);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
.request()
);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
for (int i = 0; i < numRecords; i++) {
String watchId = "_id" + (i % numWatches);
now = now.plusMinutes(1);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
.request()
);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

logger.info("Added [{}] triggered watches for [{}] different watches, starting watcher again", numRecords, numWatches);
startWatcher();
assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords);
logger.info("Added [{}] triggered watches for [{}] different watches, starting watcher again", numRecords, numWatches);
startWatcher();
assertSingleExecutionAndCompleteWatchHistory(numWatches, numRecords);
}
}

public void testTriggeredWatchLoading() throws Exception {
Expand All @@ -266,23 +268,24 @@ public void testTriggeredWatchLoading() throws Exception {

ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
final int numRecords = scaledRandomIntBetween(2, 12);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numRecords; i++) {
now = now.plusMinutes(1);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
.setWaitForActiveShards(ActiveShardCount.ALL)
);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
for (int i = 0; i < numRecords; i++) {
now = now.plusMinutes(1);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, now, now);
Wid wid = new Wid(watchId, now);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
.setWaitForActiveShards(ActiveShardCount.ALL)
);
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

startWatcher();
startWatcher();

assertSingleExecutionAndCompleteWatchHistory(1, numRecords);
assertSingleExecutionAndCompleteWatchHistory(1, numRecords);
}
}

private void assertSingleExecutionAndCompleteWatchHistory(final long numberOfWatches, final int expectedWatchHistoryCount)
Expand Down Expand Up @@ -342,47 +345,48 @@ public void testWatchRecordSavedTwice() throws Exception {
logger.info("Stopping watcher");
stopWatcher();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
int numRecords = scaledRandomIntBetween(8, 32);
for (int i = 0; i < numRecords; i++) {
String watchId = Integer.toString(i);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, triggeredTime, triggeredTime);
Wid wid = new Wid(watchId, triggeredTime);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
);

String id = internalCluster().getInstance(ClusterService.class).localNode().getId();
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, "executed", id);
bulkRequestBuilder.add(
prepareIndex(HistoryStoreField.DATA_STREAM).setId(watchRecord.id().value())
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(jsonBuilder().value(watchRecord))
);
}
assertNoFailures(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get());

logger.info("Starting watcher");
startWatcher();

assertBusy(() -> {
// We need to wait until all the records are processed from the internal execution queue, only then we can assert
// that numRecords watch records have been processed as part of starting up.
WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).setIncludeCurrentWatches(true).get();
long maxSize = response.getNodes().stream().map(WatcherStatsResponse.Node::getSnapshots).mapToLong(List::size).sum();
assertThat(maxSize, equalTo(0L));
try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) {
int numRecords = scaledRandomIntBetween(8, 32);
for (int i = 0; i < numRecords; i++) {
String watchId = Integer.toString(i);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watchId, triggeredTime, triggeredTime);
Wid wid = new Wid(watchId, triggeredTime);
TriggeredWatch triggeredWatch = new TriggeredWatch(wid, event);
bulkRequestBuilder.add(
prepareIndex(TriggeredWatchStoreField.INDEX_NAME).setId(triggeredWatch.id().value())
.setSource(jsonBuilder().value(triggeredWatch))
);

// but even then since the execution of the watch record is async it may take a little bit before
// the actual documents are in the output index
refresh();
assertResponse(prepareSearch(HistoryStoreField.DATA_STREAM).setSize(numRecords), searchResponse -> {
assertThat(searchResponse.getHits().getTotalHits().value, Matchers.equalTo((long) numRecords));
for (int i = 0; i < numRecords; i++) {
assertThat(searchResponse.getHits().getAt(i).getSourceAsMap().get("state"), is(ExecutionState.EXECUTED.id()));
}
String id = internalCluster().getInstance(ClusterService.class).localNode().getId();
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, "executed", id);
bulkRequestBuilder.add(
prepareIndex(HistoryStoreField.DATA_STREAM).setId(watchRecord.id().value())
.setOpType(DocWriteRequest.OpType.CREATE)
.setSource(jsonBuilder().value(watchRecord))
);
}
assertNoFailures(bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get());

logger.info("Starting watcher");
startWatcher();

assertBusy(() -> {
// We need to wait until all the records are processed from the internal execution queue, only then we can assert
// that numRecords watch records have been processed as part of starting up.
WatcherStatsResponse response = new WatcherStatsRequestBuilder(client()).setIncludeCurrentWatches(true).get();
long maxSize = response.getNodes().stream().map(WatcherStatsResponse.Node::getSnapshots).mapToLong(List::size).sum();
assertThat(maxSize, equalTo(0L));

// but even then since the execution of the watch record is async it may take a little bit before
// the actual documents are in the output index
refresh();
assertResponse(prepareSearch(HistoryStoreField.DATA_STREAM).setSize(numRecords), searchResponse -> {
assertThat(searchResponse.getHits().getTotalHits().value, Matchers.equalTo((long) numRecords));
for (int i = 0; i < numRecords; i++) {
assertThat(searchResponse.getHits().getAt(i).getSourceAsMap().get("state"), is(ExecutionState.EXECUTED.id()));
}
});
});
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void putAll(final List<TriggeredWatch> triggeredWatches, final ActionList
return;
}

client.bulk(createBulkRequest(triggeredWatches), listener);
BulkRequest bulkRequest = createBulkRequest(triggeredWatches);
client.bulk(bulkRequest, ActionListener.releaseAfter(listener, bulkRequest));
}

public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws IOException {
Expand Down

0 comments on commit ce71b8f

Please sign in to comment.