Skip to content

Commit

Permalink
Watcher: Store username on watch execution (#31873)
Browse files Browse the repository at this point in the history
There is currently no way to see what user executed a watch. This commit
adds the decrypted username to each execution in the watch history, in a
new field "user".

Closes #31772
  • Loading branch information
hub-cap committed Jul 23, 2018
1 parent 5942ae7 commit b982e1a
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 10 deletions.
4 changes: 3 additions & 1 deletion x-pack/docs/en/rest-api/watcher/execute-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ This is an example of the output:
"type": "index"
}
]
}
},
"user": "test_admin" <4>
}
}
--------------------------------------------------
Expand All @@ -281,6 +282,7 @@ This is an example of the output:
<1> The id of the watch record as it would be stored in the `.watcher-history` index.
<2> The watch record document as it would be stored in the `.watcher-history` index.
<3> The watch execution results.
<4> The user used to execute the watch.

You can set a different execution mode for every action by associating the mode
name with the action id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,17 @@ static Authentication deserializeHeaderAndPutInContext(String header, ThreadCont
throws IOException, IllegalArgumentException {
assert ctx.getTransient(AuthenticationField.AUTHENTICATION_KEY) == null;

Authentication authentication = decode(header);
ctx.putTransient(AuthenticationField.AUTHENTICATION_KEY, authentication);
return authentication;
}

public static Authentication decode(String header) throws IOException {
byte[] bytes = Base64.getDecoder().decode(header);
StreamInput input = StreamInput.wrap(bytes);
Version version = Version.readVersion(input);
input.setVersion(version);
Authentication authentication = new Authentication(input);
ctx.putTransient(AuthenticationField.AUTHENTICATION_KEY, authentication);
return authentication;
return new Authentication(input);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
import org.elasticsearch.xpack.core.watcher.condition.Condition;
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
Expand All @@ -18,6 +20,7 @@
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.joda.time.DateTime;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -42,6 +45,7 @@ public abstract class WatchExecutionContext {
private Transform.Result transformResult;
private ConcurrentMap<String, ActionWrapperResult> actionsResults = ConcurrentCollections.newConcurrentMap();
private String nodeId;
private String user;

public WatchExecutionContext(String watchId, DateTime executionTime, TriggerEvent triggerEvent, TimeValue defaultThrottlePeriod) {
this.id = new Wid(watchId, executionTime);
Expand Down Expand Up @@ -84,6 +88,7 @@ public Watch watch() {
public final void ensureWatchExists(CheckedSupplier<Watch, Exception> supplier) throws Exception {
if (watch == null) {
watch = supplier.get();
user = WatchExecutionContext.getUsernameFromWatch(watch);
}
}

Expand Down Expand Up @@ -136,6 +141,11 @@ public String getNodeId() {
return nodeId;
}

/**
* @return The user that executes the watch, which will be stored in the watch history
*/
public String getUser() { return user; }

public void start() {
assert phase == ExecutionPhase.AWAITS_EXECUTION;
startTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -242,4 +252,19 @@ public WatchRecord finish() {
public WatchExecutionSnapshot createSnapshot(Thread executionThread) {
return new WatchExecutionSnapshot(this, executionThread.getStackTrace());
}

/**
* Given a watch, this extracts and decodes the relevant auth header and returns the principal of the user that is
* executing the watch.
*/
public static String getUsernameFromWatch(Watch watch) throws IOException {
if (watch != null && watch.status() != null && watch.status().getHeaders() != null) {
String header = watch.status().getHeaders().get(AuthenticationField.AUTHENTICATION_KEY);
if (header != null) {
Authentication auth = Authentication.decode(header);
return auth.getUser().principal();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ public abstract class WatchRecord implements ToXContentObject {
private static final ParseField METADATA = new ParseField("metadata");
private static final ParseField EXECUTION_RESULT = new ParseField("result");
private static final ParseField EXCEPTION = new ParseField("exception");
private static final ParseField USER = new ParseField("user");

protected final Wid id;
protected final Watch watch;
private final String nodeId;
protected final TriggerEvent triggerEvent;
protected final ExecutionState state;
private final String user;

// only emitted to xcontent in "debug" mode
protected final Map<String, Object> vars;
Expand All @@ -60,7 +62,7 @@ public abstract class WatchRecord implements ToXContentObject {

private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, Map<String, Object> vars, ExecutableInput input,
ExecutableCondition condition, Map<String, Object> metadata, Watch watch, WatchExecutionResult executionResult,
String nodeId) {
String nodeId, String user) {
this.id = id;
this.triggerEvent = triggerEvent;
this.state = state;
Expand All @@ -71,15 +73,16 @@ private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, Map
this.executionResult = executionResult;
this.watch = watch;
this.nodeId = nodeId;
this.user = user;
}

private WatchRecord(Wid id, TriggerEvent triggerEvent, ExecutionState state, String nodeId) {
this(id, triggerEvent, state, Collections.emptyMap(), null, null, null, null, null, nodeId);
this(id, triggerEvent, state, Collections.emptyMap(), null, null, null, null, null, nodeId, null);
}

private WatchRecord(WatchRecord record, ExecutionState state) {
this(record.id, record.triggerEvent, state, record.vars, record.input, record.condition, record.metadata, record.watch,
record.executionResult, record.nodeId);
record.executionResult, record.nodeId, record.user);
}

private WatchRecord(WatchExecutionContext context, ExecutionState state) {
Expand All @@ -88,12 +91,13 @@ private WatchRecord(WatchExecutionContext context, ExecutionState state) {
context.watch() != null ? context.watch().condition() : null,
context.watch() != null ? context.watch().metadata() : null,
context.watch(),
null, context.getNodeId());
null, context.getNodeId(), context.getUser());
}

private WatchRecord(WatchExecutionContext context, WatchExecutionResult executionResult) {
this(context.id(), context.triggerEvent(), getState(executionResult), context.vars(), context.watch().input(),
context.watch().condition(), context.watch().metadata(), context.watch(), executionResult, context.getNodeId());
context.watch().condition(), context.watch().metadata(), context.watch(), executionResult, context.getNodeId(),
context.getUser());
}

public static ExecutionState getState(WatchExecutionResult executionResult) {
Expand Down Expand Up @@ -152,6 +156,9 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
builder.field(NODE.getPreferredName(), nodeId);
builder.field(STATE.getPreferredName(), state.id());

if (user != null) {
builder.field(USER.getPreferredName(), user);
}
if (watch != null && watch.status() != null) {
builder.field(STATUS.getPreferredName(), watch.status(), params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ public final class WatcherIndexTemplateRegistryField {
// version 6: upgrade to ES 6, removal of _status field
// version 7: add full exception stack traces for better debugging
// version 8: fix slack attachment property not to be dynamic, causing field type issues
// version 9: add a user field defining which user executed the watch
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final String INDEX_TEMPLATE_VERSION = "8";
public static final String INDEX_TEMPLATE_VERSION = "9";
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
public static final String WATCHES_TEMPLATE_NAME = ".watches";
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
"messages": {
"type": "text"
},
"user": {
"type": "text"
},
"exception" : {
"type" : "object",
"enabled" : false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
import org.elasticsearch.xpack.core.security.user.User;
import org.elasticsearch.xpack.core.watcher.actions.Action;
import org.elasticsearch.xpack.core.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
Expand Down Expand Up @@ -85,6 +88,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -1061,6 +1065,33 @@ public void testManualWatchExecutionContextGetsAlwaysExecuted() throws Exception
assertThat(watchRecord.state(), is(ExecutionState.EXECUTED));
}

public void testLoadingWatchExecutionUser() throws Exception {
DateTime now = now(UTC);
Watch watch = mock(Watch.class);
WatchStatus status = mock(WatchStatus.class);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);

// Should be null
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
assertNull(context.getUser());

// Should still be null, header is not yet set
when(watch.status()).thenReturn(status);
context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
assertNull(context.getUser());

Authentication authentication = new Authentication(new User("joe", "admin"),
new Authentication.RealmRef("native_realm", "native", "node1"), null);

// Should no longer be null now that the proper header is set
when(status.getHeaders()).thenReturn(Collections.singletonMap(AuthenticationField.AUTHENTICATION_KEY, authentication.encode()));
context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
context.ensureWatchExists(() -> watch);
assertThat(context.getUser(), equalTo("joe"));
}

private WatchExecutionContext createMockWatchExecutionContext(String watchId, DateTime executionTime) {
WatchExecutionContext ctx = mock(WatchExecutionContext.class);
when(ctx.id()).thenReturn(new Wid(watchId, executionTime));
Expand Down
2 changes: 2 additions & 0 deletions x-pack/qa/smoke-test-watcher-with-security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ integTestCluster {
extraConfigFile 'roles.yml', 'roles.yml'
setupCommand 'setupTestAdminUser',
'bin/elasticsearch-users', 'useradd', 'test_admin', '-p', 'x-pack-test-password', '-r', 'superuser'
setupCommand 'setupXpackUserForTests',
'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'watcher_manager'
setupCommand 'setupWatcherManagerUser',
'bin/elasticsearch-users', 'useradd', 'watcher_manager', '-p', 'x-pack-test-password', '-r', 'watcher_manager'
setupCommand 'setupPowerlessUser',
Expand Down
1 change: 1 addition & 0 deletions x-pack/qa/smoke-test-watcher-with-security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ watcher_manager:
run_as:
- powerless_user
- watcher_manager
- x_pack_rest_user

watcher_monitor:
cluster:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,63 @@ teardown:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "watcher_manager" }




---
"Test watch is runas user properly recorded":
- do:
xpack.watcher.put_watch:
id: "my_watch"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"search" : {
"request" : {
"indices" : [ "my_test_index" ],
"body" :{
"query" : { "match_all": {} }
}
}
}
},
"condition" : {
"compare" : {
"ctx.payload.hits.total" : {
"gte" : 1
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Successfully ran my_watch to test for search input"
}
}
}
}
- match: { _id: "my_watch" }

- do:
xpack.watcher.get_watch:
id: "my_watch"
- match: { _id: "my_watch" }
- is_false: watch.status.headers

- do:
headers: { es-security-runas-user: x_pack_rest_user }
xpack.watcher.execute_watch:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "x_pack_rest_user" }


---
"Test watch search input does not work against index user is not allowed to read":

Expand Down Expand Up @@ -130,6 +183,7 @@ teardown:
- match: { watch_record.watch_id: "my_watch" }
# because we are not allowed to read the index, there wont be any data
- match: { watch_record.state: "execution_not_needed" }
- match: { watch_record.user: "watcher_manager" }


---
Expand Down Expand Up @@ -272,6 +326,7 @@ teardown:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "watcher_manager" }

- do:
get:
Expand Down Expand Up @@ -320,6 +375,7 @@ teardown:
id: "my_watch"
- match: { watch_record.watch_id: "my_watch" }
- match: { watch_record.state: "executed" }
- match: { watch_record.user: "watcher_manager" }

- do:
get:
Expand Down

0 comments on commit b982e1a

Please sign in to comment.