Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-17516: LBHttpSolrClient: support HttpJdkSolrClient (multiple class verson) #2805

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,6 @@ protected String allProcessorSupportedContentTypesCommaDelimited(
.collect(Collectors.joining(", "));
}

protected RequestWriter getRequestWriter() {
return requestWriter;
}

/**
* An Http2SolrClient that doesn't close or cleanup any resources
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
"Timeout occurred while waiting response from server at: " + pReq.url, e);
} catch (SolrException se) {
throw se;
} catch (RuntimeException re) {
throw new SolrServerException(re);
} catch (IOException | RuntimeException e) {
throw new SolrServerException(e);
} finally {
if (pReq.contentWritingFuture != null) {
pReq.contentWritingFuture.cancel(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ protected ResponseParser responseParser(SolrRequest<?> solrRequest) {
return solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
}

protected RequestWriter getRequestWriter() {
return requestWriter;
}

// TODO: Remove this for 10.0, there is a typo in the method name
@Deprecated(since = "9.8", forRemoval = true)
protected ModifiableSolrParams initalizeSolrParams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,308 +16,21 @@
*/
package org.apache.solr.client.solrj.impl;

import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
public class LBHttp2SolrClient extends LBHttpSolrClientBase<Http2SolrClient> {

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.IsUpdateRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.slf4j.MDC;

/**
* LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around {@link
* Http2SolrClient}. This is useful when you have multiple Solr endpoints and requests need to be
* Load Balanced among them.
*
* <p>Do <b>NOT</b> use this class for indexing in leader/follower scenarios since documents must be
* sent to the correct leader; no inter-node routing is done.
*
* <p>In SolrCloud (leader/replica) scenarios, it is usually better to use {@link CloudSolrClient},
* but this class may be used for updates because the server will forward them to the appropriate
* leader.
*
* <p>It offers automatic failover when a server goes down, and it detects when the server comes
* back up.
*
* <p>Load balancing is done using a simple round-robin on the list of endpoints. Endpoint URLs are
* expected to point to the Solr "root" path (i.e. "/solr").
*
* <blockquote>
*
* <pre>
* SolrClient client = new LBHttp2SolrClient.Builder(http2SolrClient,
* new LBSolrClient.Endpoint("http://host1:8080/solr"), new LBSolrClient.Endpoint("http://host2:8080/solr"))
* .build();
* </pre>
*
* </blockquote>
*
* Users who wish to balance traffic across a specific set of replicas or cores may specify each
* endpoint as a root-URL and core-name pair. For example:
*
* <blockquote>
*
* <pre>
* SolrClient client = new LBHttp2SolrClient.Builder(http2SolrClient,
* new LBSolrClient.Endpoint("http://host1:8080/solr", "coreA"),
* new LBSolrClient.Endpoint("http://host2:8080/solr", "coreB"))
* .build();
* </pre>
*
* </blockquote>
*
* <p>If a request to an endpoint fails by an IOException due to a connection timeout or read
* timeout then the host is taken off the list of live endpoints and moved to a 'dead endpoint list'
* and the request is resent to the next live endpoint. This process is continued till it tries all
* the live endpoints. If at least one endpoint is alive, the request succeeds, and if not it fails.
*
* <p>Dead endpoints are periodically healthchecked on a fixed interval controlled by {@link
* LBHttp2SolrClient.Builder#setAliveCheckInterval(int, TimeUnit)}. The default is set to one
* minute.
*
* <p><b>When to use this?</b><br>
* This can be used as a software load balancer when you do not wish to set up an external load
* balancer. Alternatives to this code are to use a dedicated hardware load balancer or using Apache
* httpd with mod_proxy_balancer as a load balancer. See <a
* href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
*
* @since solr 8.0
*/
public class LBHttp2SolrClient extends LBSolrClient {
private final Http2SolrClient solrClient;

private LBHttp2SolrClient(Builder builder) {
super(Arrays.asList(builder.solrEndpoints));
this.solrClient = builder.http2SolrClient;
this.aliveCheckIntervalMillis = builder.aliveCheckIntervalMillis;
this.defaultCollection = builder.defaultCollection;
protected LBHttp2SolrClient(Builder builder) {
super(builder);
}

@Override
protected SolrClient getClient(Endpoint endpoint) {
return solrClient;
}

@Override
public ResponseParser getParser() {
return solrClient.getParser();
}

@Override
public RequestWriter getRequestWriter() {
return solrClient.getRequestWriter();
}

public Set<String> getUrlParamNames() {
return solrClient.getUrlParamNames();
}

/**
* Execute an asynchronous request against one or more hosts for a given collection. The passed-in
* Req object includes a List of Endpoints. This method always begins with the first Endpoint in
* the list and if unsuccessful tries each in turn until the request is successful. Consequently,
* this method does not actually Load Balance. It is up to the caller to shuffle the List of
* Endpoints if Load Balancing is desired.
*
* @param req the wrapped request to perform
* @return a {@link CompletableFuture} that tracks the progress of the async request.
*/
public CompletableFuture<Rsp> requestAsync(Req req) {
CompletableFuture<Rsp> apiFuture = new CompletableFuture<>();
Rsp rsp = new Rsp();
boolean isNonRetryable =
req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
EndpointIterator it = new EndpointIterator(req, zombieServers);
AtomicReference<CompletableFuture<NamedList<Object>>> currentFuture = new AtomicReference<>();
RetryListener retryListener =
new RetryListener() {

@Override
public void onSuccess(Rsp rsp) {
apiFuture.complete(rsp);
}

@Override
public void onFailure(Exception e, boolean retryReq) {
if (retryReq) {
Endpoint url;
try {
url = it.nextOrError(e);
} catch (SolrServerException ex) {
apiFuture.completeExceptionally(e);
return;
}
MDC.put("LBSolrClient.url", url.toString());
if (!apiFuture.isCancelled()) {
CompletableFuture<NamedList<Object>> future =
doAsyncRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
currentFuture.set(future);
}
} else {
apiFuture.completeExceptionally(e);
}
}
};
try {
CompletableFuture<NamedList<Object>> future =
doAsyncRequest(
it.nextOrError(),
req,
rsp,
isNonRetryable,
it.isServingZombieServer(),
retryListener);
currentFuture.set(future);
} catch (SolrServerException e) {
apiFuture.completeExceptionally(e);
return apiFuture;
}
apiFuture.exceptionally(
(error) -> {
if (apiFuture.isCancelled()) {
currentFuture.get().cancel(true);
}
return null;
});
return apiFuture;
}

private interface RetryListener {
void onSuccess(Rsp rsp);

void onFailure(Exception e, boolean retryReq);
}

private CompletableFuture<NamedList<Object>> doAsyncRequest(
Endpoint endpoint,
Req req,
Rsp rsp,
boolean isNonRetryable,
boolean isZombie,
RetryListener listener) {
String baseUrl = endpoint.toString();
rsp.server = baseUrl;
req.getRequest().setBasePath(baseUrl);
CompletableFuture<NamedList<Object>> future =
((Http2SolrClient) getClient(endpoint)).requestAsync(req.getRequest());
future.whenComplete(
(result, throwable) -> {
if (!future.isCompletedExceptionally()) {
onSuccessfulRequest(result, endpoint, rsp, isZombie, listener);
} else if (!future.isCancelled()) {
onFailedRequest(throwable, endpoint, isNonRetryable, isZombie, listener);
}
});
return future;
}

private void onSuccessfulRequest(
NamedList<Object> result,
Endpoint endpoint,
Rsp rsp,
boolean isZombie,
RetryListener listener) {
rsp.rsp = result;
if (isZombie) {
zombieServers.remove(endpoint.toString());
}
listener.onSuccess(rsp);
}

private void onFailedRequest(
Throwable oe,
Endpoint endpoint,
boolean isNonRetryable,
boolean isZombie,
RetryListener listener) {
try {
throw (Exception) oe;
} catch (BaseHttpSolrClient.RemoteExecutionException e) {
listener.onFailure(e, false);
} catch (SolrException e) {
// we retry on 404 or 403 or 503 or 500
// unless it's an update - then we only retry on connect exception
if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
} else {
// Server is alive but the request was likely malformed or invalid
if (isZombie) {
zombieServers.remove(endpoint.toString());
}
listener.onFailure(e, false);
}
} catch (SocketException e) {
if (!isNonRetryable || e instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SocketTimeoutException e) {
if (!isNonRetryable) {
listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (SolrServerException e) {
Throwable rootCause = e.getRootCause();
if (!isNonRetryable && rootCause instanceof IOException) {
listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
} else if (isNonRetryable && rootCause instanceof ConnectException) {
listener.onFailure((!isZombie) ? addZombie(endpoint, e) : e, true);
} else {
listener.onFailure(e, false);
}
} catch (Exception e) {
listener.onFailure(new SolrServerException(e), false);
}
}

public static class Builder {

private final Http2SolrClient http2SolrClient;
private final Endpoint[] solrEndpoints;
private long aliveCheckIntervalMillis =
TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS); // 1 minute between checks
protected String defaultCollection;
public static class Builder
extends LBHttpSolrClientBuilderBase<
LBHttp2SolrClient, LBHttp2SolrClient.Builder, Http2SolrClient> {

public Builder(Http2SolrClient http2Client, Endpoint... endpoints) {
this.http2SolrClient = http2Client;
this.solrEndpoints = endpoints;
}

/**
* LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use
* this to set that interval
*
* @param aliveCheckInterval how often to ping for aliveness
*/
public LBHttp2SolrClient.Builder setAliveCheckInterval(int aliveCheckInterval, TimeUnit unit) {
if (aliveCheckInterval <= 0) {
throw new IllegalArgumentException(
"Alive check interval must be " + "positive, specified value = " + aliveCheckInterval);
}
this.aliveCheckIntervalMillis = TimeUnit.MILLISECONDS.convert(aliveCheckInterval, unit);
return this;
}

/** Sets a default for core or collection based requests. */
public LBHttp2SolrClient.Builder withDefaultCollection(String defaultCoreOrCollection) {
this.defaultCollection = defaultCoreOrCollection;
return this;
super(http2Client, endpoints);
}

@Override
public LBHttp2SolrClient build() {
return new LBHttp2SolrClient(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;

import org.apache.solr.client.solrj.SolrClient;

public class LBHttpJdkSolrClient extends LBHttpSolrClientBase<HttpJdkSolrClient> {

protected LBHttpJdkSolrClient(Builder b) {
super(b);
}

@Override
protected SolrClient getClient(Endpoint endpoint) {
return solrClient;
}

public static class Builder
extends LBHttpSolrClientBuilderBase<
LBHttpJdkSolrClient, LBHttpJdkSolrClient.Builder, HttpJdkSolrClient> {

public Builder(HttpJdkSolrClient httpJdkClient, Endpoint... endpoints) {
super(httpJdkClient, endpoints);
}

@Override
public LBHttpJdkSolrClient build() {
return new LBHttpJdkSolrClient(this);
}
}
}
Loading
Loading