Skip to content

Commit

Permalink
Watcher: Fix put watch action (#31524)
Browse files Browse the repository at this point in the history
If no version is specified when putting a watch, the index API should be
used instead of the update API, so that the whole watch gets overwritten
instead of being merged with the existing one.

Merging only happens when a version is specified, so that credentials can be omitted, which is important for the watcher UI.
  • Loading branch information
spinscale committed Jun 25, 2018
1 parent e318c3c commit 3814b85
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
"Test put watch api without version overwrites watch":
- do:
cluster.health:
wait_for_status: yellow

- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"foo": "bar"
}
},
"actions": {
"logging": {
"logging": {
"text": "yaml test"
}
}
}
}
- match: { _id: "my_watch" }

- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { watch.input.simple.foo: "bar" }

# change the simple input fields, then ensure the old
# field does not exist on get
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule": {
"hourly": {
"minute": [ 0, 5 ]
}
}
},
"input": {
"simple": {
"spam": "eggs"
}
},
"actions": {
"logging": {
"logging": {
"text": "yaml test"
}
}
}
}
- match: { _id: "my_watch" }

- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { watch.input.simple.spam: "eggs" }
- is_false: watch.input.simple.foo

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -99,27 +101,45 @@ protected void masterOperation(PutWatchRequest request, ClusterState state,
try (XContentBuilder builder = jsonBuilder()) {
watch.toXContent(builder, DEFAULT_PARAMS);

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
updateRequest.docAsUpsert(isUpdate == false);
updateRequest.version(request.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(builder);
if (isUpdate) {
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
updateRequest.version(request.getVersion());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(builder);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(response -> {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(response -> {
boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
if (shouldBeTriggeredLocally(request, watch)) {
triggerService.add(watch);
}
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
}, listener::onFailure),
client::update);
} else {
IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
indexRequest.source(builder);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest,
ActionListener.<IndexResponse>wrap(response -> {
boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
// if not yet in distributed mode (mixed 5/6 version in cluster), only trigger on the master node
if (localExecute(request) == false &&
this.clusterService.state().nodes().isLocalNodeElectedMaster() &&
watch.status().state().isActive()) {
if (shouldBeTriggeredLocally(request, watch)) {
triggerService.add(watch);
}
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
}, listener::onFailure),
client::update);
client::index);
}
}
} catch (Exception e) {
listener.onFailure(e);
}
}

private boolean shouldBeTriggeredLocally(PutWatchRequest request, Watch watch) {
return localExecute(request) == false &&
this.clusterService.state().nodes().isLocalNodeElectedMaster() &&
watch.status().state().isActive();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void testWatchesAreNotTriggeredOnNonMasterWhenNotDistributed() throws Exc
public void testWatchesAreTriggeredOnMasterWhenNotDistributed() throws Exception {
PutWatchRequest putWatchRequest = new PutWatchRequest();
putWatchRequest.setId("_id");
putWatchRequest.setVersion(randomLongBetween(1, 100));

ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster"))
.nodes(DiscoveryNodes.builder()
Expand Down

0 comments on commit 3814b85

Please sign in to comment.