Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize remote license checker #32971

Merged
merged 24 commits into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.license;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.protocol.xpack.license.LicenseStatus;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.core.action.XPackInfoAction;

import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Checks remote clusters for license compatibility with a specified license predicate.
*/
public final class RemoteClusterLicenseChecker {

/**
* Encapsulates the license info of a remote cluster.
*/
public static final class RemoteClusterLicenseInfo {

private final String clusterAlias;

/**
* The alias of the remote cluster.
*
* @return the cluster alias
*/
public String clusterAlias() {
return clusterAlias;
}

private final XPackInfoResponse.LicenseInfo licenseInfo;

/**
* The license info of the remote cluster.
*
* @return the license info
*/
public XPackInfoResponse.LicenseInfo licenseInfo() {
return licenseInfo;
}

RemoteClusterLicenseInfo(final String clusterAlias, final XPackInfoResponse.LicenseInfo licenseInfo) {
this.clusterAlias = clusterAlias;
this.licenseInfo = licenseInfo;
}

}

/**
* Encapsulates a remote cluster license check. The check is either successful if the license of the remote cluster is compatible with
* the predicate used to check license compatibility, or the check is a failure.
*/
public static final class LicenseCheck {

private final RemoteClusterLicenseInfo remoteClusterLicenseInfo;

/**
* The remote cluster license info. This method should only be invoked if this instance represents a failing license check.
*
* @return the remote cluster license info
*/
public RemoteClusterLicenseInfo remoteClusterLicenseInfo() {
assert isSuccess() == false;
return remoteClusterLicenseInfo;
}

private static final LicenseCheck SUCCESS = new LicenseCheck(null);

/**
* A successful license check.
*
* @return a successful license check instance
*/
public static LicenseCheck success() {
return SUCCESS;
}

/**
* Test if this instance represents a successful license check.
*
* @return true if this instance represents a successful license check, otherwise false
*/
public boolean isSuccess() {
return this == SUCCESS;
}

/**
* Creates a failing license check encapsulating the specified remote cluster license info.
*
* @param remoteClusterLicenseInfo the remote cluster license info
* @return a failing license check
*/
public static LicenseCheck failure(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) {
return new LicenseCheck(remoteClusterLicenseInfo);
}

private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) {
this.remoteClusterLicenseInfo = remoteClusterLicenseInfo;
}

}

private final Client client;
private final Predicate<XPackInfoResponse.LicenseInfo> predicate;

/**
* Constructs a remote cluster license checker with the specified license predicate for checking license compatibility. The predicate
* does not need to check for the active license state as this is handled by the remote cluster license checker.
*
* @param client the client
* @param predicate the license predicate
*/
public RemoteClusterLicenseChecker(final Client client, final Predicate<XPackInfoResponse.LicenseInfo> predicate) {
this.client = client;
this.predicate = predicate;
}

public static boolean isLicensePlatinumOrTrial(final XPackInfoResponse.LicenseInfo licenseInfo) {
final License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode());
return mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL;
}

/**
* Checks the specified clusters for license compatibility. The specified callback will be invoked once if all clusters are
* license-compatible, otherwise the specified callback will be invoked once on the first cluster that is not license-compatible.
*
* @param clusterAliases the cluster aliases to check
* @param listener a callback
*/
public void checkRemoteClusterLicenses(final List<String> clusterAliases, final ActionListener<LicenseCheck> listener) {
final Iterator<String> clusterAliasesIterator = clusterAliases.iterator();
if (clusterAliasesIterator.hasNext() == false) {
listener.onResponse(LicenseCheck.success());
return;
}

final AtomicReference<String> clusterAlias = new AtomicReference<>();

final ActionListener<XPackInfoResponse> infoListener = new ActionListener<XPackInfoResponse>() {

@Override
public void onResponse(final XPackInfoResponse xPackInfoResponse) {
final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo();
if ((licenseInfo.getStatus() == LicenseStatus.ACTIVE) == false || predicate.test(licenseInfo) == false) {
listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterAlias.get(), licenseInfo)));
return;
}

if (clusterAliasesIterator.hasNext()) {
clusterAlias.set(clusterAliasesIterator.next());
// recurse to the next cluster
remoteClusterLicense(clusterAlias.get(), this);
} else {
listener.onResponse(LicenseCheck.success());
}
}

@Override
public void onFailure(final Exception e) {
final String message = "could not determine the license type for cluster [" + clusterAlias.get() + "]";
listener.onFailure(new ElasticsearchException(message, e));
}

};

// check the license on the first cluster, and then we recursively check licenses on the remaining clusters
clusterAlias.set(clusterAliasesIterator.next());
remoteClusterLicense(clusterAlias.get(), infoListener);
}

private void remoteClusterLicense(final String clusterAlias, final ActionListener<XPackInfoResponse> listener) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final ContextPreservingActionListener<XPackInfoResponse> contextPreservingActionListener =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a bug in the existing implementation. This bug does not impact the existing usage of checking the license of a remote cluster. This is because the existing context is not needed when starting a data feed in ML. However, in CCR after we check the remote license we will execute another action (the follow index action). Prior to this fix, that action would execute under the system context. The system context runs with limited privileges. This means that invoking that additional action would not have privileges to execute. Instead, this action should be executed with the original context to preserve the user that initiated the action. This is why we must preserve the context here. There is a test for this behavior in the unit tests for this class.

new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we stash any context here since this is an internal execution and should not leak any existing context information
threadContext.markAsSystemContext();

final XPackInfoRequest request = new XPackInfoRequest();
request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE));
try {
client.getRemoteClusterClient(clusterAlias).execute(XPackInfoAction.INSTANCE, request, contextPreservingActionListener);
} catch (final Exception e) {
contextPreservingActionListener.onFailure(e);
}
}
}

/**
* Predicate to test if the index name represents the name of a remote index.
*
* @param index the index name
* @return true if the collection of indices contains a remote index, otherwise false
*/
public static boolean isRemoteIndex(final String index) {
return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1;
}

/**
* Predicate to test if the collection of index names contains any that represent the name of a remote index.
*
* @param indices the collection of index names
* @return true if the collection of index names contains a name that represents a remote index, otherwise false
*/
public static boolean containsRemoteIndex(final List<String> indices) {
return indices.stream().anyMatch(RemoteClusterLicenseChecker::isRemoteIndex);
}

/**
* Filters the collection of index names for names that represent a remote index. Remote index names are of the form
* {@code cluster_name:index_name}.
*
* @param indices the collection of index names
* @return list of index names that represent remote index names
*/
public static List<String> remoteIndices(final List<String> indices) {
return indices.stream().filter(RemoteClusterLicenseChecker::isRemoteIndex).collect(Collectors.toList());
}

/**
* Extract the list of remote cluster aliases from the list of index names. Remote index names are of the form
* {@code cluster_alias:index_name} and the cluster_alias is extracted for each index name that represents a remote index.
*
* @param indices the collection of index names
* @return the remote cluster names
*/
public static List<String> remoteClusterAliases(final List<String> indices) {
return indices.stream()
.filter(RemoteClusterLicenseChecker::isRemoteIndex)
.map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR)))
.distinct()
.collect(Collectors.toList());
}

/**
* Constructs an error message for license incompatibility.
*
* @param feature the name of the feature that initiated the remote cluster license check.
* @param remoteClusterLicenseInfo the remote cluster license info of the cluster that failed the license check
* @return an error message representing license incompatibility
*/
public static String buildErrorMessage(
final String feature,
final RemoteClusterLicenseInfo remoteClusterLicenseInfo,
final Predicate<XPackInfoResponse.LicenseInfo> predicate) {
final StringBuilder error = new StringBuilder();
if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) {
error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterAlias()));
} else {
assert predicate.test(remoteClusterLicenseInfo.licenseInfo()) == false : "license must be incompatible to build error message";
final String message = String.format(
Locale.ROOT,
"the license mode [%s] on cluster [%s] does not enable [%s]",
License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()),
remoteClusterLicenseInfo.clusterAlias(),
feature);
error.append(message);
}

return error.toString();
}

}
Loading