Skip to content

Commit

Permalink
SOLR-16470: Create v2 replication "fetch file" API (#2734)
Browse files Browse the repository at this point in the history
New v2 API is available at `GET /api/cores/coreName/replication/files/fileName`

---------

Co-authored-by: Matthew Biscocho <mbiscocho@bloomberg.net>
Co-authored-by: Jason Gerlowski <gerlowskija@apache.org>
  • Loading branch information
3 people authored Nov 22, 2024
1 parent 620175a commit 6f94c50
Show file tree
Hide file tree
Showing 18 changed files with 624 additions and 353 deletions.
3 changes: 3 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ Improvements
New APIs for listing-all and fetching-single cluster props are also now available at `GET /api/cluster/properties` and
`GET /api/cluster/properties/somePropName`, respectively. (Carlos Ugarte via Jason Gerlowski)

* SOLR-16470: Replication "fetch file" API now has a v2 equivalent, available at `GET /api/cores/coreName/replication/files/fileName`
(Matthew Biscocho via Jason Gerlowski)


Optimizations
---------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
*/
package org.apache.solr.client.api.endpoint;

import static org.apache.solr.client.api.util.Constants.OMIT_FROM_CODEGEN_PROPERTY;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.extensions.Extension;
import io.swagger.v3.oas.annotations.extensions.ExtensionProperty;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.StreamingOutput;
import java.io.IOException;
import org.apache.solr.client.api.model.FileListResponse;
import org.apache.solr.client.api.model.IndexVersionResponse;
Expand All @@ -47,4 +54,43 @@ FileListResponse fetchFileList(
@Parameter(description = "The generation number of the index", required = true)
@QueryParam("generation")
long gen);

@GET
@CoreApiParameters
@Operation(
summary = "Get a stream of a specific file path of a core",
tags = {"core-replication"},
extensions = { // TODO Remove as a part of SOLR-17562
@Extension(
properties = {@ExtensionProperty(name = OMIT_FROM_CODEGEN_PROPERTY, value = "true")})
})
@Path("/files/{filePath}")
StreamingOutput fetchFile(
@PathParam("filePath") String filePath,
@Parameter(
description =
"Directory type for specific filePath (cf or tlogFile). Defaults to Lucene index (file) directory if empty",
required = true)
@QueryParam("dirType")
String dirType,
@Parameter(description = "Output stream read/write offset", required = false)
@QueryParam("offset")
String offset,
@Parameter(required = false) @QueryParam("len") String len,
@Parameter(description = "Compress file output", required = false)
@QueryParam("compression")
@DefaultValue("false")
Boolean compression,
@Parameter(description = "Write checksum with output stream", required = false)
@QueryParam("checksum")
@DefaultValue("false")
Boolean checksum,
@Parameter(
description = "Limit data write per seconds. Defaults to no throttling",
required = false)
@QueryParam("maxWriteMBPerSec")
double maxWriteMBPerSec,
@Parameter(description = "The generation number of the index", required = false)
@QueryParam("generation")
Long gen);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.IndexFetcher;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.CommitUpdateCommand;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void startReplication(boolean switchTransactionLog) {
NamedList<Object> followerConfig = new NamedList<>();
followerConfig.add(ReplicationHandler.FETCH_FROM_LEADER, Boolean.TRUE);
followerConfig.add(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, Boolean.TRUE);
followerConfig.add(ReplicationHandler.POLL_INTERVAL, pollIntervalStr);
followerConfig.add(ReplicationAPIBase.POLL_INTERVAL, pollIntervalStr);
NamedList<Object> replicationConfig = new NamedList<>();
replicationConfig.add("follower", followerConfig);

Expand Down
8 changes: 4 additions & 4 deletions solr/core/src/java/org/apache/solr/core/SolrCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
import org.apache.solr.handler.IndexFetcher;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.SolrConfigHandler;
import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.handler.api.V2ApiUtils;
import org.apache.solr.handler.component.HighlightComponent;
import org.apache.solr.handler.component.SearchComponent;
Expand Down Expand Up @@ -3018,7 +3018,7 @@ public PluginBag<QueryResponseWriter> getResponseWriters() {
m.put("schema.xml", new SchemaXmlResponseWriter());
m.put("smile", new SmileResponseWriter());
m.put(PROMETHEUS_METRICS_WT, new PrometheusResponseWriter());
m.put(ReplicationHandler.FILE_STREAM, getFileStreamWriter());
m.put(ReplicationAPIBase.FILE_STREAM, getFileStreamWriter());
DEFAULT_RESPONSE_WRITERS = Collections.unmodifiableMap(m);
try {
m.put(
Expand All @@ -3037,7 +3037,7 @@ private static BinaryResponseWriter getFileStreamWriter() {
@Override
public void write(OutputStream out, SolrQueryRequest req, SolrQueryResponse response)
throws IOException {
RawWriter rawWriter = (RawWriter) response.getValues().get(ReplicationHandler.FILE_STREAM);
RawWriter rawWriter = (RawWriter) response.getValues().get(ReplicationAPIBase.FILE_STREAM);
if (rawWriter != null) {
rawWriter.write(out);
if (rawWriter instanceof Closeable) ((Closeable) rawWriter).close();
Expand All @@ -3046,7 +3046,7 @@ public void write(OutputStream out, SolrQueryRequest req, SolrQueryResponse resp

@Override
public String getContentType(SolrQueryRequest request, SolrQueryResponse response) {
RawWriter rawWriter = (RawWriter) response.getValues().get(ReplicationHandler.FILE_STREAM);
RawWriter rawWriter = (RawWriter) response.getValues().get(ReplicationAPIBase.FILE_STREAM);
if (rawWriter != null) {
return rawWriter.getContentType();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.solr.filestore;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
import static org.apache.solr.response.RawResponseWriter.CONTENT;
import static org.apache.solr.security.PermissionNameProvider.Name.FILESTORE_READ_PERM;

Expand Down
5 changes: 3 additions & 2 deletions solr/core/src/java/org/apache/solr/handler/BlobHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.api.GetBlobInfoAPI;
import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.handler.admin.api.UploadBlobAPI;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
Expand Down Expand Up @@ -194,7 +195,7 @@ public void handleRequestBody(final SolrQueryRequest req, SolrQueryResponse rsp)
return;
}
}
if (ReplicationHandler.FILE_STREAM.equals(req.getParams().get(CommonParams.WT))) {
if (ReplicationAPIBase.FILE_STREAM.equals(req.getParams().get(CommonParams.WT))) {
if (blobName == null) {
throw new SolrException(
SolrException.ErrorCode.NOT_FOUND,
Expand All @@ -211,7 +212,7 @@ public void handleRequestBody(final SolrQueryRequest req, SolrQueryResponse rsp)
new Sort(new SortField("version", SortField.Type.LONG, true)));
if (docs.totalHits.value > 0) {
rsp.add(
ReplicationHandler.FILE_STREAM,
ReplicationAPIBase.FILE_STREAM,
new SolrCore.RawWriter() {

@Override
Expand Down
5 changes: 3 additions & 2 deletions solr/core/src/java/org/apache/solr/handler/ExportHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.handler.export.ExportWriter;
import org.apache.solr.handler.export.ExportWriterStream;
Expand Down Expand Up @@ -125,10 +126,10 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
}
String wt = req.getParams().get(CommonParams.WT, JSON);
if ("xsort".equals(wt)) wt = JSON;
Map<String, String> map = Map.of(CommonParams.WT, ReplicationHandler.FILE_STREAM);
Map<String, String> map = Map.of(CommonParams.WT, ReplicationAPIBase.FILE_STREAM);
req.setParams(SolrParams.wrapDefaults(new MapSolrParams(map), req.getParams()));
rsp.add(
ReplicationHandler.FILE_STREAM,
ReplicationAPIBase.FILE_STREAM,
new ExportWriter(
req, rsp, wt, initialStreamContext, solrMetricsContext, writerMetricsPath));
}
Expand Down
32 changes: 15 additions & 17 deletions solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,25 @@
import static org.apache.solr.common.params.CommonParams.JAVABIN;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.handler.ReplicationHandler.ALIAS;
import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
import static org.apache.solr.handler.ReplicationHandler.COMMAND;
import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
import static org.apache.solr.handler.ReplicationHandler.FETCH_FROM_LEADER;
import static org.apache.solr.handler.ReplicationHandler.FILE;
import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
import static org.apache.solr.handler.ReplicationHandler.GENERATION;
import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
import static org.apache.solr.handler.ReplicationHandler.LEADER_URL;
import static org.apache.solr.handler.ReplicationHandler.LEGACY_LEADER_URL;
import static org.apache.solr.handler.ReplicationHandler.LEGACY_SKIP_COMMIT_ON_LEADER_VERSION_ZERO;
import static org.apache.solr.handler.ReplicationHandler.OFFSET;
import static org.apache.solr.handler.ReplicationHandler.SIZE;
import static org.apache.solr.handler.ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.CHECKSUM;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.COMPRESSION;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.CONF_FILE_SHORT;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.GENERATION;
import static org.apache.solr.handler.admin.api.ReplicationAPIBase.OFFSET;

import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -121,7 +119,7 @@
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler.FileInfo;
import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
Expand Down Expand Up @@ -303,8 +301,8 @@ public IndexFetcher(

this.replicationHandler = handler;
String compress = (String) initArgs.get(COMPRESSION);
useInternalCompression = INTERNAL.equals(compress);
useExternalCompression = EXTERNAL.equals(compress);
useInternalCompression = ReplicationHandler.INTERNAL.equals(compress);
useExternalCompression = ReplicationHandler.EXTERNAL.equals(compress);
connTimeout = getParameter(initArgs, HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000, null);

// allow a leader override for tests - you specify this in /replication follower section of
Expand Down Expand Up @@ -1575,7 +1573,7 @@ private String getDateAsStr(Date d) {
return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(d);
}

private final Map<String, FileInfo> confFileInfoCache = new HashMap<>();
private final Map<String, ReplicationHandler.FileInfo> confFileInfoCache = new HashMap<>();

/**
* The local conf files are compared with the conf files in the leader. If they are same (by
Expand Down Expand Up @@ -1723,7 +1721,7 @@ private interface FileInterface {
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of
* wt=filestream
*
* @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
* <p>see org.apache.solr.handler.admin.api.ReplicationAPIBase.DirectoryFileStream
*/
private class FileFetcher {
private final FileInterface file;
Expand All @@ -1750,7 +1748,7 @@ private class FileFetcher {
this.file = file;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
buf = new byte[(int) Math.min(this.size, ReplicationHandler.PACKET_SZ)];
buf = new byte[(int) Math.min(this.size, ReplicationAPIBase.PACKET_SZ)];
this.solrParamOutput = solrParamOutput;
this.saveAs = saveAs;
indexGen = latestGen;
Expand Down Expand Up @@ -2047,7 +2045,7 @@ public void delete() throws Exception {
}
}

private class DirectoryFileFetcher extends FileFetcher {
protected class DirectoryFileFetcher extends FileFetcher {
DirectoryFileFetcher(
Directory tmpIndexDir,
Map<String, Object> fileDetails,
Expand Down Expand Up @@ -2107,7 +2105,7 @@ public void delete() throws Exception {
}
}

private class LocalFsFileFetcher extends FileFetcher {
protected class LocalFsFileFetcher extends FileFetcher {
LocalFsFileFetcher(
File dir,
Map<String, Object> fileDetails,
Expand Down
Loading

0 comments on commit 6f94c50

Please sign in to comment.