Skip to content

Commit

Permalink
refactoring to allow jdk client to partcipate in changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jdyer1 committed Nov 6, 2024
1 parent c6d3509 commit 0526351
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.solr.client.solrj;

import org.apache.solr.client.solrj.impl.HttpSolrClientBase;

import java.io.IOException;

/** A lambda intended for invoking SolrClient operations */
@FunctionalInterface
public interface SolrClientFunction<C extends SolrClient, R> {
R apply(C c) throws IOException, SolrServerException;
public interface SolrClientFunction<C extends HttpSolrClientBase, R> {
R apply(HttpSolrClientBase c) throws IOException, SolrServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -548,34 +548,10 @@ public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
}
}

/**
* Executes a SolrRequest using the provided URL to temporarily override any "base URL" currently
* used by this client
*
* @param baseUrl a URL to a root Solr path (i.e. "/solr") that should be used for this request
* @param collection an optional collection or core name used to override the client's "default
* collection". May be 'null' for any requests that don't require a collection or wish to rely
* on the client's default
* @param req the SolrRequest to send
*/
public final <R extends SolrResponse> R requestWithBaseUrl(
String baseUrl, String collection, SolrRequest<R> req)
throws SolrServerException, IOException {
return requestWithBaseUrl(baseUrl, (c) -> req.process(c, collection));
}

/**
* Temporarily modifies the client to use a different base URL and runs the provided lambda
*
* @param baseUrl the base URL to use on any requests made within the 'clientFunction' lambda
* @param clientFunction a Function that consumes a Http2SolrClient and returns an arbitrary value
* @return the value returned after invoking 'clientFunction'
* @param <R> the type returned by the provided function (and by this method)
*/
@Override
public <R> R requestWithBaseUrl(
String baseUrl, SolrClientFunction<Http2SolrClient, R> clientFunction)
throws SolrServerException, IOException {

String baseUrl, SolrClientFunction<? extends HttpSolrClientBase, R> clientFunction)
throws SolrServerException, IOException {
// Despite the name, try-with-resources used here to avoid IDE and ObjectReleaseTracker
// complaints
try (final var derivedClient = new NoCloseHttp2SolrClient(baseUrl, this)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClientFunction;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
Expand All @@ -73,7 +74,7 @@ public class HttpJdkSolrClient extends HttpSolrClientBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final String USER_AGENT =
"Solr[" + MethodHandles.lookup().lookupClass().getName() + "] 1.0";
"Solr[" + MethodHandles.lookup().lookupClass().getName() + "] 1.0";

protected HttpClient httpClient;

Expand All @@ -87,9 +88,9 @@ protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder buil
super(serverBaseUrl, builder);

HttpClient.Redirect followRedirects =
Boolean.TRUE.equals(builder.followRedirects)
? HttpClient.Redirect.NORMAL
: HttpClient.Redirect.NEVER;
Boolean.TRUE.equals(builder.followRedirects)
? HttpClient.Redirect.NORMAL
: HttpClient.Redirect.NEVER;
HttpClient.Builder b = HttpClient.newBuilder().followRedirects(followRedirects);
if (builder.sslContext != null) {
b.sslContext(builder.sslContext);
Expand All @@ -101,13 +102,13 @@ protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder buil
} else {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1024);
this.executor =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
4,
256,
60,
TimeUnit.SECONDS,
queue,
new SolrNamedThreadFactory(this.getClass().getSimpleName()));
new ExecutorUtil.MDCAwareThreadPoolExecutor(
4,
256,
60,
TimeUnit.SECONDS,
queue,
new SolrNamedThreadFactory(this.getClass().getSimpleName()));
this.shutdownExecutor = true;
}
b.executor(this.executor);
Expand All @@ -124,7 +125,7 @@ protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder buil
if (builder.proxyHost != null) {
if (builder.proxyIsSocks4) {
log.warn(
"Socks4 is likely not supported by this client. See https://bugs.openjdk.org/browse/JDK-8214516");
"Socks4 is likely not supported by this client. See https://bugs.openjdk.org/browse/JDK-8214516");
}
b.proxy(ProxySelector.of(new InetSocketAddress(builder.proxyHost, builder.proxyPort)));
}
Expand All @@ -136,30 +137,36 @@ protected HttpJdkSolrClient(String serverBaseUrl, HttpJdkSolrClient.Builder buil

@Override
public CompletableFuture<NamedList<Object>> requestAsync(
final SolrRequest<?> solrRequest, String collection) {
final SolrRequest<?> solrRequest, String collection) {
try {
PreparedRequest pReq = prepareRequest(solrRequest, collection);
return httpClient
.sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream())
.thenApply(
httpResponse -> {
try {
return processErrorsAndResponse(
solrRequest, pReq.parserToUse, httpResponse, pReq.url);
} catch (SolrServerException e) {
throw new RuntimeException(e);
}
});
.sendAsync(pReq.reqb.build(), HttpResponse.BodyHandlers.ofInputStream())
.thenApply(
httpResponse -> {
try {
return processErrorsAndResponse(
solrRequest, pReq.parserToUse, httpResponse, pReq.url);
} catch (SolrServerException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
CompletableFuture<NamedList<Object>> cf = new CompletableFuture<>();
cf.completeExceptionally(e);
return cf;
}
}

@Override
public <R> R requestWithBaseUrl(
String baseUrl, SolrClientFunction<? extends HttpSolrClientBase, R> clientFunction) throws SolrServerException, IOException {
return null;
}

@Override
public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
throws SolrServerException, IOException {
PreparedRequest pReq = prepareRequest(solrRequest, collection);
HttpResponse<InputStream> response = null;
try {
Expand All @@ -170,7 +177,7 @@ public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
throw new RuntimeException(e);
} catch (HttpTimeoutException e) {
throw new SolrServerException(
"Timeout occurred while waiting response from server at: " + pReq.url, e);
"Timeout occurred while waiting response from server at: " + pReq.url, e);
} catch (SolrException se) {
throw se;
} catch (RuntimeException e) {
Expand All @@ -193,7 +200,7 @@ public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
}

private PreparedRequest prepareRequest(SolrRequest<?> solrRequest, String collection)
throws SolrServerException, IOException {
throws SolrServerException, IOException {
checkClosed();
if (ClientUtils.shouldApplyDefaultCollection(collection, solrRequest)) {
collection = defaultCollection;
Expand All @@ -206,20 +213,20 @@ private PreparedRequest prepareRequest(SolrRequest<?> solrRequest, String collec
try {
switch (solrRequest.getMethod()) {
case GET:
{
pReq = prepareGet(url, reqb, solrRequest, queryParams);
break;
}
{
pReq = prepareGet(url, reqb, solrRequest, queryParams);
break;
}
case POST:
case PUT:
{
pReq = preparePutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
break;
}
{
pReq = preparePutOrPost(url, solrRequest.getMethod(), reqb, solrRequest, queryParams);
break;
}
default:
{
throw new IllegalStateException("Unsupported method: " + solrRequest.getMethod());
}
{
throw new IllegalStateException("Unsupported method: " + solrRequest.getMethod());
}
}
} catch (URISyntaxException | RuntimeException re) {
throw new SolrServerException(re);
Expand All @@ -230,11 +237,11 @@ private PreparedRequest prepareRequest(SolrRequest<?> solrRequest, String collec
}

private PreparedRequest prepareGet(
String url,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
throws IOException, URISyntaxException {
String url,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
throws IOException, URISyntaxException {
validateGetRequest(solrRequest);
reqb.GET();
decorateRequest(reqb, solrRequest);
Expand All @@ -243,12 +250,12 @@ private PreparedRequest prepareGet(
}

private PreparedRequest preparePutOrPost(
String url,
SolrRequest.METHOD method,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
throws IOException, URISyntaxException {
String url,
SolrRequest.METHOD method,
HttpRequest.Builder reqb,
SolrRequest<?> solrRequest,
ModifiableSolrParams queryParams)
throws IOException, URISyntaxException {

final RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest);

Expand Down Expand Up @@ -282,14 +289,14 @@ private PreparedRequest preparePutOrPost(
bodyPublisher = HttpRequest.BodyPublishers.ofInputStream(() -> sink);

contentWritingFuture =
executor.submit(
() -> {
try (source) {
contentWriter.write(source);
} catch (Exception e) {
log.error("Cannot write Content Stream", e);
}
});
executor.submit(
() -> {
try (source) {
contentWriter.write(source);
} catch (Exception e) {
log.error("Cannot write Content Stream", e);
}
});
} else if (streams != null && streams.size() == 1) {
boolean success = maybeTryHeadRequest(url);
if (!success) {
Expand Down Expand Up @@ -379,8 +386,8 @@ private synchronized boolean maybeTryHeadRequestSync(String url) {
return false;
}
HttpRequest.Builder headReqB =
HttpRequest.newBuilder(uriNoQueryParams)
.method("HEAD", HttpRequest.BodyPublishers.noBody());
HttpRequest.newBuilder(uriNoQueryParams)
.method("HEAD", HttpRequest.BodyPublishers.noBody());
decorateRequest(headReqB, new QueryRequest());
try {
httpClient.send(headReqB.build(), HttpResponse.BodyHandlers.discarding());
Expand Down Expand Up @@ -423,8 +430,8 @@ private void decorateRequest(HttpRequest.Builder reqb, SolrRequest<?> solrReques
private void setBasicAuthHeader(SolrRequest<?> solrRequest, HttpRequest.Builder reqb) {
if (solrRequest.getBasicAuthUser() != null && solrRequest.getBasicAuthPassword() != null) {
String encoded =
basicAuthCredentialsToAuthorizationString(
solrRequest.getBasicAuthUser(), solrRequest.getBasicAuthPassword());
basicAuthCredentialsToAuthorizationString(
solrRequest.getBasicAuthUser(), solrRequest.getBasicAuthPassword());
reqb.header("Authorization", encoded);
} else if (basicAuthAuthorizationStr != null) {
reqb.header("Authorization", basicAuthAuthorizationStr);
Expand All @@ -446,8 +453,8 @@ protected String contentTypeToEncoding(String contentType) {
}

private NamedList<Object> processErrorsAndResponse(
SolrRequest<?> solrRequest, ResponseParser parser, HttpResponse<InputStream> resp, String url)
throws SolrServerException {
SolrRequest<?> solrRequest, ResponseParser parser, HttpResponse<InputStream> resp, String url)
throws SolrServerException {
String contentType = resp.headers().firstValue("Content-Type").orElse(null);
contentType = contentType == null ? "" : contentType;
String mimeType = contentTypeToMimeType(contentType);
Expand All @@ -457,7 +464,7 @@ private NamedList<Object> processErrorsAndResponse(
String reason = "" + status;
InputStream is = resp.body();
return processErrorsAndResponse(
status, reason, method, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest), url);
status, reason, method, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest), url);
}

@Override
Expand Down Expand Up @@ -493,36 +500,36 @@ protected boolean isFollowRedirects() {

@Override
protected boolean processorAcceptsMimeType(
Collection<String> processorSupportedContentTypes, String mimeType) {
Collection<String> processorSupportedContentTypes, String mimeType) {
return processorSupportedContentTypes.stream()
.map(this::contentTypeToMimeType)
.filter(Objects::nonNull)
.map(String::trim)
.anyMatch(mimeType::equalsIgnoreCase);
.map(this::contentTypeToMimeType)
.filter(Objects::nonNull)
.map(String::trim)
.anyMatch(mimeType::equalsIgnoreCase);
}

@Override
protected void updateDefaultMimeTypeForParser() {
defaultParserMimeTypes =
parser.getContentTypes().stream()
.map(this::contentTypeToMimeType)
.filter(Objects::nonNull)
.map(s -> s.toLowerCase(Locale.ROOT).trim())
.collect(Collectors.toSet());
parser.getContentTypes().stream()
.map(this::contentTypeToMimeType)
.filter(Objects::nonNull)
.map(s -> s.toLowerCase(Locale.ROOT).trim())
.collect(Collectors.toSet());
}

@Override
protected String allProcessorSupportedContentTypesCommaDelimited(
Collection<String> processorSupportedContentTypes) {
Collection<String> processorSupportedContentTypes) {
return processorSupportedContentTypes.stream()
.map(this::contentTypeToMimeType)
.filter(Objects::nonNull)
.map(s -> s.toLowerCase(Locale.ROOT).trim())
.collect(Collectors.joining(", "));
.map(this::contentTypeToMimeType)
.filter(Objects::nonNull)
.map(s -> s.toLowerCase(Locale.ROOT).trim())
.collect(Collectors.joining(", "));
}

public static class Builder
extends HttpSolrClientBuilderBase<HttpJdkSolrClient.Builder, HttpJdkSolrClient> {
extends HttpSolrClientBuilderBase<HttpJdkSolrClient.Builder, HttpJdkSolrClient> {

private SSLContext sslContext;

Expand Down
Loading

0 comments on commit 0526351

Please sign in to comment.