Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple unicast host providers #31509

Merged
merged 2 commits into from
Jun 22, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureCom
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
*/
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
if (refreshInterval.millis() != 0) {
if (dynamicHosts != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}

@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
return dynamicHosts.getOrRefresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int no
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
logger.debug("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
} catch (IOException e) {
Expand Down Expand Up @@ -307,7 +307,7 @@ protected List<TransportAddress> fetchDynamicNodes() {
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(3));
}
Expand All @@ -324,12 +324,12 @@ protected List<TransportAddress> fetchDynamicNodes() {
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,17 @@

package org.elasticsearch.discovery.file;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -57,47 +39,19 @@
*/
public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin {

private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class);

private final Settings settings;
private final Path configPath;
private ExecutorService fileBasedDiscoveryExecutorService;

public FileBasedDiscoveryPlugin(Settings settings, Path configPath) {
this.settings = settings;
this.configPath = configPath;
}

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve",
0,
concurrentConnects,
60,
TimeUnit.SECONDS,
threadFactory,
threadPool.getThreadContext());

return Collections.emptyList();
}

@Override
public void close() throws IOException {
ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS);
}

@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap(
"file",
() -> new FileBasedUnicastHostsProvider(
new Environment(settings, configPath), transportService, fileBasedDiscoveryExecutorService));
() -> new FileBasedUnicastHostsProvider(new Environment(settings, configPath)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,19 @@
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.TransportService;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT;
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;

/**
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
* from {@link #UNICAST_HOSTS_FILE}.
Expand All @@ -59,23 +52,15 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast

static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";

private final TransportService transportService;
private final ExecutorService executorService;

private final Path unicastHostsFilePath;

private final TimeValue resolveTimeout;

FileBasedUnicastHostsProvider(Environment environment, TransportService transportService, ExecutorService executorService) {
FileBasedUnicastHostsProvider(Environment environment) {
super(environment.settings());
this.transportService = transportService;
this.executorService = executorService;
this.unicastHostsFilePath = environment.configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
}

@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
List<String> hostsList;
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
Expand All @@ -90,21 +75,8 @@ public List<TransportAddress> buildDynamicHosts() {
hostsList = Collections.emptyList();
}

final List<TransportAddress> dynamicHosts = new ArrayList<>();
try {
dynamicHosts.addAll(resolveHostsLists(
executorService,
logger,
hostsList,
1,
transportService,
resolveTimeout));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

final List<TransportAddress> dynamicHosts = hostsResolver.resolveHosts(hostsList, 1);
logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts);

return dynamicHosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
Expand Down Expand Up @@ -123,8 +125,10 @@ public void testUnicastHostsDoesNotExist() throws Exception {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
final Environment environment = TestEnvironment.newEnvironment(settings);
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService);
final List<TransportAddress> addresses = provider.buildDynamicHosts();
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment);
final List<TransportAddress> addresses = provider.buildDynamicHosts((hosts, limitPortCounts) ->
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
TimeValue.timeValueSeconds(10)));
assertEquals(0, addresses.size());
}

Expand Down Expand Up @@ -163,6 +167,8 @@ private List<TransportAddress> setupAndRunHostProvider(final List<String> hostEn
}

return new FileBasedUnicastHostsProvider(
new Environment(settings, configPath), transportService, executorService).buildDynamicHosts();
new Environment(settings, configPath)).buildDynamicHosts((hosts, limitPortCounts) ->
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
TimeValue.timeValueSeconds(10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstanc
* Information can be cached using `cloud.gce.refresh_interval` property if needed.
*/
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
// We check that needed properties have been set
if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) {
throw new IllegalArgumentException("one or more gce discovery settings are missing. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected List<TransportAddress> buildDynamicNodes(GceInstancesServiceImpl gceIn
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
transportService, new NetworkService(Collections.emptyList()));

List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
logger.info("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -357,7 +358,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
SearchService.DEFAULT_KEEPALIVE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
Expand All @@ -42,13 +44,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* A module for loading classes for node discovery.
Expand All @@ -57,18 +61,18 @@ public class DiscoveryModule {

public static final Setting<String> DISCOVERY_TYPE_SETTING =
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
public static final Setting<Optional<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
public static final Setting<List<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope);

private final Discovery discovery;

public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService) {
final UnicastHostsProvider hostsProvider;
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
for (DiscoveryPlugin plugin : plugins) {
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
Expand All @@ -80,17 +84,32 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
joinValidators.add(joinValidator);
}
}
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
if (hostsProviderName.isPresent()) {
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName.get());
if (hostsProviderSupplier == null) {
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName.get() + "]");
}
hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
} else {
hostsProvider = Collections::emptyList;
List<String> hostsProviderNames = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
// for bwc purposes, add settings provider even if not explicitly specified
if (hostsProviderNames.contains("settings") == false) {
List<String> extendedHostsProviderNames = new ArrayList<>();
extendedHostsProviderNames.add("settings");
extendedHostsProviderNames.addAll(hostsProviderNames);
hostsProviderNames = extendedHostsProviderNames;
}

final Set<String> missingProviderNames = new HashSet<>(hostsProviderNames);
missingProviderNames.removeAll(hostProviders.keySet());
if (missingProviderNames.isEmpty() == false) {
throw new IllegalArgumentException("Unknown zen hosts providers " + missingProviderNames);
}

List<UnicastHostsProvider> filteredHostsProviders = hostsProviderNames.stream()
.map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());

final UnicastHostsProvider hostsProvider = hostsResolver -> {
final List<TransportAddress> addresses = new ArrayList<>();
for (UnicastHostsProvider provider : filteredHostsProviders) {
addresses.addAll(provider.buildDynamicHosts(hostsResolver));
}
return Collections.unmodifiableList(addresses);
};

Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
Expand Down
Loading