Skip to content

Commit

Permalink
SOLR-17098: Only use ZK ACLs for default ZK Host
Browse files Browse the repository at this point in the history
  • Loading branch information
HoustonPutman committed Dec 21, 2023
1 parent 65920af commit e2bf1f4
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 20 deletions.
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ Bug Fixes

* SOLR-17060: CoreContainer#create may deadlock with concurrent requests for metrics (Alex Deparvu, David Smiley)

* SOLR-17098: ZK Credentials and ACLs are no longer sent to all ZK Servers when using Streaming Expressions.
They will only be used when sent to the default ZK Host. (Houston Putman, Jan Høydahl, David Smiley, Gus Heck, Qing Xu)

Dependency Upgrades
---------------------
* SOLR-17012: Update Apache Hadoop to 3.3.6 and Apache Curator to 5.5.0 (Kevin Risden)
Expand Down
1 change: 1 addition & 0 deletions solr/core/src/java/org/apache/solr/core/CoreContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ private void loadInternal() {

zkSys.initZooKeeper(this, cfg.getCloudConfig());
if (isZooKeeperAware()) {
solrClientCache.setDefaultZKHost(getZkController().getZkServerAddress());
// initialize ZkClient metrics
zkSys.getZkMetricsProducer().initializeMetrics(solrMetricsContext, "zkClient");
pkiAuthenticationSecurityBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,7 @@ Worker collections can be empty collections that exist only to execute streaming
* `StreamExpression`: Expression to send to the worker collection.
* `workers`: Number of workers in the worker collection to send the expression to.
* `zkHost`: (Optional) The ZooKeeper connect string where the worker collection resides.
Zookeeper Credentials and ACLs will only be included if the same ZkHost is used as the Solr instance that you are connecting to (the `chroot` can be different).
* `sort`: The sort criteria for ordering tuples returned by the worker nodes.

=== parallel Syntax
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ To read more about the `/export` handler requirements review the section xref:ex
* `fl`: (Mandatory) The list of fields to return.
* `sort`: (Mandatory) The sort criteria.
* `zkHost`: Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.
Zookeeper Credentials and ACLs will only be included if the same ZkHost is used as the Solr instance that you are connecting to (the `chroot` can be different).
* `qt`: Specifies the query type, or request handler, to use.
Set this to `/export` to work with large result sets.
The default is `/select`.
Expand Down Expand Up @@ -484,6 +485,7 @@ When used in parallel mode the partitionKeys parameter must be provided.
* `fl`: (Mandatory) The list of fields to return.
* `sort`: (Mandatory) The sort criteria.
* `zkHost`: Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.
Zookeeper Credentials and ACLs will only be included if the same ZkHost is used as the Solr instance that you are connecting to (the `chroot` can be different).
* `partitionKeys`: Comma delimited list of keys to partition the search results by.
To be used with the parallel function for parallelizing operations across worker nodes.
See the xref:stream-decorator-reference.adoc#parallel[parallel] function for details.
Expand Down Expand Up @@ -648,6 +650,7 @@ The checkpoints will be saved under this id.
If not set, it defaults to the highest version in the index.
Setting to 0 will process all records that match query in the index.
* `zkHost`: (Optional) Only needs to be defined if the collection being searched is found in a different zkHost than the local stream handler.
Zookeeper Credentials and ACLs will only be included if the same ZkHost is used as the Solr instance that you are connecting to (the `chroot` can be different).

=== topic Syntax

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class SolrClientCache implements Closeable {
private final HttpClient apacheHttpClient;
private final Http2SolrClient http2SolrClient;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicReference<String> defaultZkHost = new AtomicReference<>();

public SolrClientCache() {
this.apacheHttpClient = null;
Expand All @@ -74,40 +76,71 @@ public SolrClientCache(Http2SolrClient http2SolrClient) {
this.http2SolrClient = http2SolrClient;
}

public void setDefaultZKHost(String zkHost) {
if (zkHost != null) {
zkHost = zkHost.split("/")[0];
if (!zkHost.isEmpty()) {
defaultZkHost.set(zkHost);
} else {
defaultZkHost.set(null);
}
}
}

public synchronized CloudSolrClient getCloudSolrClient(String zkHost) {
ensureOpen();
Objects.requireNonNull(zkHost, "ZooKeeper host cannot be null!");
if (solrClients.containsKey(zkHost)) {
return (CloudSolrClient) solrClients.get(zkHost);
}
// Can only use ZK ACLs if there is a default ZK Host, and the given ZK host contains that
// default.
// Basically the ZK ACLs are assumed to be only used for the default ZK host,
// thus we should only provide the ACLs to that Zookeeper instance.
String zkHostNoChroot = zkHost.split("/")[0];
boolean canUseACLs =
Optional.ofNullable(defaultZkHost.get()).map(zkHostNoChroot::equals).orElse(false);
final CloudSolrClient client;
if (apacheHttpClient != null) {
client = newCloudLegacySolrClient(zkHost, apacheHttpClient);
client = newCloudLegacySolrClient(zkHost, apacheHttpClient, canUseACLs);
} else {
client = newCloudHttp2SolrClient(zkHost, http2SolrClient);
client = newCloudHttp2SolrClient(zkHost, http2SolrClient, canUseACLs);
}
solrClients.put(zkHost, client);
return client;
}

@Deprecated
private static CloudSolrClient newCloudLegacySolrClient(String zkHost, HttpClient httpClient) {
private static CloudSolrClient newCloudLegacySolrClient(
String zkHost, HttpClient httpClient, boolean canUseACLs) {
final List<String> hosts = List.of(zkHost);
var builder = new CloudLegacySolrClient.Builder(hosts, Optional.empty());
builder.canUseZkACLs(canUseACLs);
adjustTimeouts(builder, httpClient);
var client = builder.build();
client.connect();
try {
client.connect();
} catch (Exception e) {
IOUtils.closeQuietly(client);
throw e;
}
return client;
}

private static CloudHttp2SolrClient newCloudHttp2SolrClient(
String zkHost, Http2SolrClient http2SolrClient) {
String zkHost, Http2SolrClient http2SolrClient, boolean canUseACLs) {
final List<String> hosts = List.of(zkHost);
var builder = new CloudHttp2SolrClient.Builder(hosts, Optional.empty());
builder.canUseZkACLs(canUseACLs);
// using internal builder to ensure the internal client gets closed
builder = builder.withInternalClientBuilder(newHttp2SolrClientBuilder(null, http2SolrClient));
var client = builder.build();
client.connect();
try {
client.connect();
} catch (Exception e) {
IOUtils.closeQuietly(client);
throw e;
}
return client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.io;

import java.util.Map;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DigestZkACLProvider;
import org.apache.solr.common.cloud.DigestZkCredentialsProvider;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.VMParamsZkCredentialsInjector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class SolrClientCacheTest extends SolrCloudTestCase {

private static final Map<String, String> sysProps =
Map.of(
SolrZkClient.ZK_CREDENTIALS_INJECTOR_CLASS_NAME_VM_PARAM_NAME,
VMParamsZkCredentialsInjector.class.getName(),
SolrZkClient.ZK_CRED_PROVIDER_CLASS_NAME_VM_PARAM_NAME,
DigestZkCredentialsProvider.class.getName(),
SolrZkClient.ZK_ACL_PROVIDER_CLASS_NAME_VM_PARAM_NAME,
DigestZkACLProvider.class.getName(),
VMParamsZkCredentialsInjector.DEFAULT_DIGEST_USERNAME_VM_PARAM_NAME, "admin-user",
VMParamsZkCredentialsInjector.DEFAULT_DIGEST_PASSWORD_VM_PARAM_NAME, "pass",
VMParamsZkCredentialsInjector.DEFAULT_DIGEST_READONLY_USERNAME_VM_PARAM_NAME, "read-user",
VMParamsZkCredentialsInjector.DEFAULT_DIGEST_READONLY_PASSWORD_VM_PARAM_NAME, "pass");

@BeforeClass
public static void before() throws Exception {
sysProps.forEach(System::setProperty);
configureCluster(1)
.formatZkServer(true)
.addConfig("config", getFile("solrj/solr/configsets/streaming/conf").toPath())
.configure();
}

@AfterClass
public static void after() {
sysProps.keySet().forEach(System::clearProperty);
}

@Test
public void testZkACLsNotUsedWithDifferentZkHost() {
try (SolrClientCache cache = new SolrClientCache()) {
// This ZK Host is fake, thus the ZK ACLs should not be used
cache.setDefaultZKHost("test:2181");
expectThrows(
SolrException.class, () -> cache.getCloudSolrClient(zkClient().getZkServerAddress()));
}
}

@Test
public void testZkACLsUsedWithDifferentChroot() {
try (SolrClientCache cache = new SolrClientCache()) {
// The same ZK Host is used, so the ZK ACLs should still be applied
cache.setDefaultZKHost(zkClient().getZkServerAddress() + "/random/chroot");
cache.getCloudSolrClient(zkClient().getZkServerAddress());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ZkClientClusterStateProvider
volatile ZkStateReader zkStateReader;
private boolean closeZkStateReader = true;
private final String zkHost;
private final boolean canUseZkACLs;
private int zkConnectTimeout = SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
private int zkClientTimeout = SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;

Expand All @@ -65,14 +66,22 @@ public ZkClientClusterStateProvider(ZkStateReader zkStateReader) {
this.zkStateReader = zkStateReader;
this.closeZkStateReader = false;
this.zkHost = null;
this.canUseZkACLs = true;
}

public ZkClientClusterStateProvider(Collection<String> zkHosts, String chroot) {
this(zkHosts, chroot, true);
}

public ZkClientClusterStateProvider(
Collection<String> zkHosts, String chroot, boolean canUseZkACLs) {
zkHost = buildZkHostString(zkHosts, chroot);
this.canUseZkACLs = canUseZkACLs;
}

public ZkClientClusterStateProvider(String zkHost) {
this.zkHost = zkHost;
this.canUseZkACLs = true;
}

/**
Expand Down Expand Up @@ -212,7 +221,7 @@ public ZkStateReader getZkStateReader() {
if (zkStateReader == null) {
ZkStateReader zk = null;
try {
zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout, canUseZkACLs);
zk.createClusterStateWatchersAndUpdate();
log.info("Cluster at {} ready", zkHost);
zkStateReader = zk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public SolrZkClient(Builder builder) {
builder.zkACLProvider,
builder.higherLevelIsClosed,
builder.compressor,
builder.solrClassLoader);
builder.solrClassLoader,
builder.useDefaultCredsAndACLs);
}

private SolrZkClient(
Expand All @@ -131,7 +132,8 @@ private SolrZkClient(
ZkACLProvider zkACLProvider,
IsClosed higherLevelIsClosed,
Compressor compressor,
SolrClassLoader solrClassLoader) {
SolrClassLoader solrClassLoader,
boolean useDefaultCredsAndACLs) {

if (zkServerAddress == null) {
// only tests should create one without server address
Expand All @@ -148,9 +150,14 @@ private SolrZkClient(

this.solrClassLoader = solrClassLoader;
if (!strat.hasZkCredentialsToAddAutomatically()) {
zkCredentialsInjector = createZkCredentialsInjector();
zkCredentialsInjector =
useDefaultCredsAndACLs
? createZkCredentialsInjector()
: new DefaultZkCredentialsInjector();
ZkCredentialsProvider zkCredentialsToAddAutomatically =
createZkCredentialsToAddAutomatically();
useDefaultCredsAndACLs
? createZkCredentialsToAddAutomatically()
: new DefaultZkCredentialsProvider();
strat.setZkCredentialsToAddAutomatically(zkCredentialsToAddAutomatically);
}

Expand Down Expand Up @@ -210,7 +217,8 @@ private SolrZkClient(
}
assert ObjectReleaseTracker.track(this);
if (zkACLProvider == null) {
this.zkACLProvider = createZkACLProvider();
this.zkACLProvider =
useDefaultCredsAndACLs ? createZkACLProvider() : new DefaultZkACLProvider();
} else {
this.zkACLProvider = zkACLProvider;
}
Expand Down Expand Up @@ -1134,6 +1142,7 @@ public static class Builder {
public ZkACLProvider zkACLProvider;
public IsClosed higherLevelIsClosed;
public SolrClassLoader solrClassLoader;
public boolean useDefaultCredsAndACLs = true;

public Compressor compressor;

Expand Down Expand Up @@ -1199,6 +1208,11 @@ public Builder withSolrClassLoader(SolrClassLoader solrClassLoader) {
return this;
}

public Builder withUseDefaultCredsAndACLs(boolean useDefaultCredsAndACLs) {
this.useDefaultCredsAndACLs = useDefaultCredsAndACLs;
return this;
}

public SolrZkClient build() {
return new SolrZkClient(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,20 @@ public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) {
}

public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) {
this.zkClient =
this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, true);
}

public ZkStateReader(
String zkServerAddress,
int zkClientTimeout,
int zkClientConnectTimeout,
boolean canUseZkACLs) {
SolrZkClient.Builder builder =
new SolrZkClient.Builder()
.withUrl(zkServerAddress)
.withTimeout(zkClientTimeout, TimeUnit.MILLISECONDS)
.withConnTimeOut(zkClientConnectTimeout, TimeUnit.MILLISECONDS)
.withUseDefaultCredsAndACLs(canUseZkACLs)
.withReconnectListener(
() -> {
// on reconnect, reload cloud info
Expand All @@ -425,8 +434,8 @@ public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientCo
log.error("Interrupted", e);
throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "Interrupted", e);
}
})
.build();
});
this.zkClient = builder.build();
this.closeClient = true;
this.securityNodeWatcher = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public static class Builder {
private int parallelCacheRefreshesLocks = 3;
private int zkConnectTimeout = SolrZkClientTimeout.DEFAULT_ZK_CONNECT_TIMEOUT;
private int zkClientTimeout = SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT;
private boolean canUseZkACLs = true;

/**
* Provide a series of Solr URLs to be used when configuring {@link CloudHttp2SolrClient}
Expand Down Expand Up @@ -189,6 +190,12 @@ public Builder(List<String> zkHosts, Optional<String> zkChroot) {
if (zkChroot.isPresent()) this.zkChroot = zkChroot.get();
}

/** Whether or not to use the default ZK ACLs when building a ZK Client. */
public Builder canUseZkACLs(boolean canUseZkACLs) {
this.canUseZkACLs = canUseZkACLs;
return this;
}

/**
* Tells {@link Builder} that created clients should be configured such that {@link
* CloudSolrClient#isUpdatesToLeaders} returns <code>true</code>.
Expand Down Expand Up @@ -406,7 +413,8 @@ public CloudHttp2SolrClient build() {
throw new IllegalArgumentException(
"Both zkHost(s) & solrUrl(s) have been specified. Only specify one.");
} else if (!zkHosts.isEmpty()) {
stateProvider = ClusterStateProvider.newZkClusterStateProvider(zkHosts, zkChroot);
stateProvider =
ClusterStateProvider.newZkClusterStateProvider(zkHosts, zkChroot, canUseZkACLs);
if (stateProvider instanceof SolrZkClientTimeoutAware) {
var timeoutAware = (SolrZkClientTimeoutAware) stateProvider;
timeoutAware.setZkClientTimeout(zkClientTimeout);
Expand Down
Loading

0 comments on commit e2bf1f4

Please sign in to comment.