Skip to content

Commit

Permalink
Use RoleRetrievalResult for better caching (#34197)
Browse files Browse the repository at this point in the history
Security caches the result of role lookups and negative lookups are
cached indefinitely. In the case of transient failures this leads to a
bad experience as the roles could truly exist. The CompositeRolesStore
needs to know if a failure occurred in one of the roles stores in order
to make the appropriate decision as it relates to caching. In order to
provide this information to the CompositeRolesStore, the return type of
methods to retrieve roles has changed to a new class,
RoleRetrievalResult. This class provides the ability to pass back an
exception to the roles store. This exception does not mean that a
request should be failed but instead serves as a signal to the roles
store that missing roles should not be cached and neither should the
combined role if there are missing roles.

As part of this, the negative lookup cache was also changed from an
unbounded cache to a cache with a configurable limit.

Relates #33205
  • Loading branch information
jaymode committed Oct 15, 2018
1 parent e93bc3c commit bbb606a
Show file tree
Hide file tree
Showing 17 changed files with 570 additions and 217 deletions.
15 changes: 15 additions & 0 deletions docs/reference/migration/migrate_6_5.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ your application to Elasticsearch 6.5.
* <<breaking_65_search_changes>>
* <<breaking_65_sql_changes>>
* <<breaking_65_client_changes>>
* <<breaking_65_security_changes>>

See also <<release-highlights>> and <<es-release-notes>>.

Expand Down Expand Up @@ -66,3 +67,17 @@ considered a single bucket.
`Settings` is no longer required by `Retry` so the `withBackoff` methods that
take `Settings` are now deprecated and there are new versions of those methods
that do not require `Settings`.

[float]
[[breaking_65_security_changes]]
=== Security changes

[float]
==== Custom role providers interface change

Custom role providers previously accepted an `ActionListener` with a response
type of `Set<RoleDescriptor>`, but in 6.5.0 the role provider requires an
`ActionListener` with a response type of `RoleRetrievalResult`. The
`RoleRetrievalResult` object accepts a `Set<RoleDescriptor>` if the provider
was successful; if the provider was not successful the `RoleRetrievalResult`
should be populated with the error.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ protected XPackUsageResponse newResponse() {
}

@Override
protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener)
throws Exception {
protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener) {
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener = new ActionListener<List<Usage>>() {
@Override
public void onResponse(List<Usage> usages) {
Expand All @@ -73,7 +72,8 @@ public void onFailure(Exception e) {
@Override
public void onResponse(Usage usage) {
featureSetUsages.set(position.getAndIncrement(), usage);
iteratingListener.onResponse(null); // just send null back and keep iterating
// the value sent back doesn't matter since our predicate keeps iterating
iteratingListener.onResponse(Collections.emptyList());
}

@Override
Expand All @@ -84,13 +84,13 @@ public void onFailure(Exception e) {
};
IteratingActionListener<List<XPackFeatureSet.Usage>, XPackFeatureSet> iteratingActionListener =
new IteratingActionListener<>(usageActionListener, consumer, featureSets,
threadPool.getThreadContext(), () -> {
threadPool.getThreadContext(), (ignore) -> {
final List<Usage> usageList = new ArrayList<>(featureSetUsages.length());
for (int i = 0; i < featureSetUsages.length(); i++) {
usageList.add(featureSetUsages.get(i));
}
return usageList;
});
}, (ignore) -> true);
iteratingActionListener.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
package org.elasticsearch.xpack.core.common;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.ThreadContext;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
Expand All @@ -32,7 +34,8 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
private final ActionListener<T> delegate;
private final BiConsumer<U, ActionListener<T>> consumer;
private final ThreadContext threadContext;
private final Supplier<T> consumablesFinishedResponse;
private final Function<T, T> finalResultFunction;
private final Predicate<T> iterationPredicate;

private int position = 0;

Expand All @@ -46,7 +49,7 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
*/
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
ThreadContext threadContext) {
this(delegate, consumer, consumables, threadContext, null);
this(delegate, consumer, consumables, threadContext, Function.identity());
}

/**
Expand All @@ -56,18 +59,36 @@ public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionL
* @param consumer the consumer that is executed for each consumable instance
* @param consumables the instances that can be consumed to produce a response which is ultimately sent on the delegate listener
* @param threadContext the thread context for the thread pool that created the listener
* @param consumablesFinishedResponse a supplier that maps the last consumable's response to a response
* to be sent on the delegate listener, in case the last consumable returns a
* {@code null} value, but the delegate listener should respond with some other value
* (perhaps a concatenation of the results of all the consumables).
* @param finalResultFunction a function that maps the response which terminated iteration to a response that will be sent to the
* delegate listener. This is useful if the delegate listener should receive some other value (perhaps
* a concatenation of the results of all the called consumables).
*/
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
ThreadContext threadContext, @Nullable Supplier<T> consumablesFinishedResponse) {
ThreadContext threadContext, Function<T, T> finalResultFunction) {
this(delegate, consumer, consumables, threadContext, finalResultFunction, Objects::isNull);
}

/**
* Constructs an {@link IteratingActionListener}.
*
* @param delegate the delegate listener to call when all consumables have finished executing
* @param consumer the consumer that is executed for each consumable instance
* @param consumables the instances that can be consumed to produce a response which is ultimately sent on the delegate listener
* @param threadContext the thread context for the thread pool that created the listener
* @param finalResultFunction a function that maps the response which terminated iteration to a response that will be sent to the
* delegate listener. This is useful if the delegate listener should receive some other value (perhaps
* a concatenation of the results of all the called consumables).
* @param iterationPredicate a {@link Predicate} that checks if iteration should continue based on the returned result
*/
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
ThreadContext threadContext, Function<T, T> finalResultFunction,
Predicate<T> iterationPredicate) {
this.delegate = delegate;
this.consumer = consumer;
this.consumables = Collections.unmodifiableList(consumables);
this.threadContext = threadContext;
this.consumablesFinishedResponse = consumablesFinishedResponse;
this.finalResultFunction = finalResultFunction;
this.iterationPredicate = iterationPredicate;
}

@Override
Expand All @@ -88,18 +109,15 @@ public void onResponse(T response) {
// we need to store the context here as there is a chance that this method is called from a thread outside of the ThreadPool
// like a LDAP connection reader thread and we can pollute the context in certain cases
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(false)) {
if (response == null) {
final boolean continueIteration = iterationPredicate.test(response);
if (continueIteration) {
if (position == consumables.size()) {
if (consumablesFinishedResponse != null) {
delegate.onResponse(consumablesFinishedResponse.get());
} else {
delegate.onResponse(null);
}
delegate.onResponse(finalResultFunction.apply(response));
} else {
consumer.accept(consumables.get(position++), this);
}
} else {
delegate.onResponse(response);
delegate.onResponse(finalResultFunction.apply(response));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xpack.core.security.authc.Realm;
import org.elasticsearch.xpack.core.security.authc.RealmConfig;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -72,16 +73,20 @@ default AuthenticationFailureHandler getAuthenticationFailureHandler() {
* should be asynchronous if the computation is lengthy or any disk and/or network
* I/O is involved. The implementation is responsible for resolving whatever roles
* it can into a set of {@link RoleDescriptor} instances. If successful, the
* implementation must invoke {@link ActionListener#onResponse(Object)} to pass along
* the resolved set of role descriptors. If a failure was encountered, the
* implementation must invoke {@link ActionListener#onFailure(Exception)}.
* implementation must wrap the set of {@link RoleDescriptor} instances in a
* {@link RoleRetrievalResult} using {@link RoleRetrievalResult#success(Set)} and then invoke
* {@link ActionListener#onResponse(Object)}. If a failure was encountered, the
* implementation should wrap the failure in a {@link RoleRetrievalResult} using
* {@link RoleRetrievalResult#failure(Exception)} and then invoke
* {@link ActionListener#onResponse(Object)} unless the failure needs to terminate the request,
* in which case the implementation should invoke {@link ActionListener#onFailure(Exception)}.
*
* By default, an empty list is returned.
*
* @param settings The configured settings for the node
* @param resourceWatcherService Use to watch configuration files for changes
*/
default List<BiConsumer<Set<String>, ActionListener<Set<RoleDescriptor>>>>
default List<BiConsumer<Set<String>, ActionListener<RoleRetrievalResult>>>
getRolesProviders(Settings settings, ResourceWatcherService resourceWatcherService) {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.security.authz.store;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
Expand All @@ -21,9 +22,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class ReservedRolesStore {
public class ReservedRolesStore implements BiConsumer<Set<String>, ActionListener<RoleRetrievalResult>> {

public static final RoleDescriptor SUPERUSER_ROLE_DESCRIPTOR = new RoleDescriptor("superuser",
new String[] { "all" },
Expand Down Expand Up @@ -165,4 +169,18 @@ public Collection<RoleDescriptor> roleDescriptors() {
public static Set<String> names() {
return RESERVED_ROLES.keySet();
}
}

@Override
public void accept(Set<String> roleNames, ActionListener<RoleRetrievalResult> listener) {
final Set<RoleDescriptor> descriptors = roleNames.stream()
.map(RESERVED_ROLES::get)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
listener.onResponse(RoleRetrievalResult.success(descriptors));
}

@Override
public String toString() {
return "reserved roles store";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.security.authz.store;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;

import java.util.Objects;
import java.util.Set;

/**
* The result of attempting to retrieve roles from a roles provider. The result can either be
* successful or a failure. A successful result indicates that no errors occurred while retrieving
* roles, even if none of the requested roles could be found. A failure indicates an error
* occurred while retrieving the results but the error is not fatal and the request may be able
* to continue.
*/
public final class RoleRetrievalResult {

private final Set<RoleDescriptor> descriptors;

@Nullable
private final Exception failure;

private RoleRetrievalResult(Set<RoleDescriptor> descriptors, @Nullable Exception failure) {
if (descriptors != null && failure != null) {
throw new IllegalArgumentException("either descriptors or failure must be null");
}
this.descriptors = descriptors;
this.failure = failure;
}

/**
* @return the resolved descriptors or {@code null} if there was a failure
*/
public Set<RoleDescriptor> getDescriptors() {
return descriptors;
}

/**
* @return the failure or {@code null} if retrieval succeeded
*/
@Nullable
public Exception getFailure() {
return failure;
}

/**
* @return true if the retrieval succeeded
*/
public boolean isSuccess() {
return descriptors != null;
}

/**
* Creates a successful result with the provided {@link RoleDescriptor} set,
* which must be non-null
*/
public static RoleRetrievalResult success(Set<RoleDescriptor> descriptors) {
Objects.requireNonNull(descriptors, "descriptors must not be null if successful");
return new RoleRetrievalResult(descriptors, null);
}

/**
* Creates a failed result with the provided non-null exception
*/
public static RoleRetrievalResult failure(Exception e) {
Objects.requireNonNull(e, "Exception must be provided");
return new RoleRetrievalResult(null, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.collect.HppcMaps.Object;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -18,8 +17,12 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;

import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;

public class IteratingActionListenerTests extends ESTestCase {
Expand Down Expand Up @@ -136,4 +139,49 @@ public void testFailure() {
assertEquals(numberOfIterations, iterations.get());
assertTrue(onFailureCalled.get());
}

public void testFunctionApplied() {
final int numberOfItems = scaledRandomIntBetween(2, 32);
final int numberOfIterations = scaledRandomIntBetween(1, numberOfItems);
List<Object> items = new ArrayList<>(numberOfItems);
for (int i = 0; i < numberOfItems; i++) {
items.add(new Object());
}

final AtomicInteger iterations = new AtomicInteger(0);
final Predicate<Object> iterationPredicate = object -> {
final int current = iterations.incrementAndGet();
return current != numberOfIterations;
};
final BiConsumer<Object, ActionListener<Object>> consumer = (listValue, listener) -> {
listener.onResponse(items.get(iterations.get()));
};

final AtomicReference<Object> originalObject = new AtomicReference<>();
final AtomicReference<Object> result = new AtomicReference<>();
final Function<Object, Object> responseFunction = object -> {
originalObject.set(object);
Object randomResult;
do {
randomResult = randomFrom(items);
} while (randomResult == object);
result.set(randomResult);
return randomResult;
};

IteratingActionListener<Object, Object> iteratingListener = new IteratingActionListener<>(ActionListener.wrap((object) -> {
assertNotNull(object);
assertNotNull(originalObject.get());
assertThat(object, sameInstance(result.get()));
assertThat(object, not(sameInstance(originalObject.get())));
assertThat(originalObject.get(), sameInstance(items.get(iterations.get() - 1)));
}, (e) -> {
logger.error("unexpected exception", e);
fail("exception should not have been thrown");
}), consumer, items, new ThreadContext(Settings.EMPTY), responseFunction, iterationPredicate);
iteratingListener.run();

// we never really went async, its all chained together so verify this for sanity
assertEquals(numberOfIterations, iterations.get());
}
}
Loading

0 comments on commit bbb606a

Please sign in to comment.