Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add snapshot type information in repository data. #13827

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

100 changes: 85 additions & 15 deletions server/src/main/java/org/opensearch/repositories/RepositoryData.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.snapshots.SnapshotType;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -92,7 +93,8 @@ public final class RepositoryData {
Collections.emptyMap(),
Collections.emptyMap(),
ShardGenerations.EMPTY,
IndexMetaDataGenerations.EMPTY
IndexMetaDataGenerations.EMPTY,
Collections.emptyMap()
);

/**
Expand Down Expand Up @@ -128,14 +130,20 @@ public final class RepositoryData {
*/
private final ShardGenerations shardGenerations;

/**
* snapshot UUID to snapshot type map.
*/
private final Map<String, SnapshotType> snapshotTypes;

public RepositoryData(
long genId,
Map<String, SnapshotId> snapshotIds,
Map<String, SnapshotState> snapshotStates,
Map<String, Version> snapshotVersions,
Map<IndexId, List<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations,
IndexMetaDataGenerations indexMetaDataGenerations
IndexMetaDataGenerations indexMetaDataGenerations,
Map<String, SnapshotType> snapshotTypes
) {
this(
genId,
Expand All @@ -145,7 +153,8 @@ public RepositoryData(
Collections.unmodifiableMap(indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))),
Collections.unmodifiableMap(indexSnapshots),
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand All @@ -157,7 +166,8 @@ private RepositoryData(
Map<String, IndexId> indices,
Map<IndexId, List<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations,
IndexMetaDataGenerations indexMetaDataGenerations
IndexMetaDataGenerations indexMetaDataGenerations,
Map<String, SnapshotType> snapshotTypes
) {
this.genId = genId;
this.snapshotIds = snapshotIds;
Expand All @@ -167,6 +177,7 @@ private RepositoryData(
this.shardGenerations = shardGenerations;
this.indexMetaDataGenerations = indexMetaDataGenerations;
this.snapshotVersions = snapshotVersions;
this.snapshotTypes = snapshotTypes;
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
+ shardGenerations.indices()
+ " but snapshots only reference indices "
Expand All @@ -183,7 +194,8 @@ protected RepositoryData copy() {
snapshotVersions,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand All @@ -205,7 +217,31 @@ public RepositoryData withVersions(Map<SnapshotId, Version> versions) {
newVersions,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

/**
* Creates a copy of this instance that contains updated snapshot type data.
* @param newSnapshotTypes map of snapshot tp snapshot types
* @return copy with updated snapshot type data
*/
public RepositoryData withSnapshotTypes(Map<SnapshotId, SnapshotType> newSnapshotTypes) {
if (newSnapshotTypes.isEmpty()) {
return this;
}
final Map<String, SnapshotType> udpatedSnapshotTypes = new HashMap<>(snapshotTypes);
newSnapshotTypes.forEach((id, type) -> udpatedSnapshotTypes.put(id.getUUID(), type));
return new RepositoryData(
genId,
snapshotIds,
snapshotStates,
snapshotVersions,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations,
udpatedSnapshotTypes
);
}

Expand Down Expand Up @@ -243,6 +279,13 @@ public Version getVersion(SnapshotId snapshotId) {
return snapshotVersions.get(snapshotId.getUUID());
}

/**
* Returns the {@link SnapshotType} for the given snapshot or {@code null} if unknown.
*/
public SnapshotType getSnapshotType(SnapshotId snapshotId) {
return this.snapshotTypes.get(snapshotId.getUUID());
}

/**
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
*/
Expand Down Expand Up @@ -319,7 +362,8 @@ public RepositoryData addSnapshot(
final Version version,
final ShardGenerations shardGenerations,
@Nullable final Map<IndexId, String> indexMetaBlobs,
@Nullable final Map<String, String> newIdentifiers
@Nullable final Map<String, String> newIdentifiers,
final SnapshotType snapshotType
) {
if (snapshotIds.containsKey(snapshotId.getUUID())) {
// if the snapshot id already exists in the repository data, it means an old master
Expand All @@ -333,6 +377,10 @@ public RepositoryData addSnapshot(
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.put(snapshotId.getUUID(), version);

Map<String, SnapshotType> newSnapshotTypes = new HashMap<>(snapshotTypes);
newSnapshotTypes.put(snapshotId.getUUID(), snapshotType);

Map<IndexId, List<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : shardGenerations.indices()) {
final List<SnapshotId> snapshotIds = allIndexSnapshots.get(indexId);
Expand Down Expand Up @@ -369,7 +417,8 @@ public RepositoryData addSnapshot(
newSnapshotVersions,
allIndexSnapshots,
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(),
newIndexMetaGenerations
newIndexMetaGenerations,
newSnapshotTypes
);
}

Expand All @@ -391,7 +440,8 @@ public RepositoryData withGenId(long newGeneration) {
indices,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand All @@ -415,9 +465,11 @@ public RepositoryData removeSnapshots(final Collection<SnapshotId> snapshots, fi
}
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
final Map<String, SnapshotType> newSnapshotTypes = new HashMap<>(snapshotTypes);
for (SnapshotId snapshotId : snapshots) {
newSnapshotStates.remove(snapshotId.getUUID());
newSnapshotVersions.remove(snapshotId.getUUID());
newSnapshotTypes.remove(snapshotId.getUUID());
}
Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
for (final IndexId indexId : indices.values()) {
Expand Down Expand Up @@ -445,7 +497,8 @@ public RepositoryData removeSnapshots(final Collection<SnapshotId> snapshots, fi
.putAll(updatedShardGenerations)
.retainIndicesAndPruneDeletes(indexSnapshots.keySet())
.build(),
indexMetaDataGenerations.withRemovedSnapshots(snapshots)
indexMetaDataGenerations.withRemovedSnapshots(snapshots),
newSnapshotTypes
);
}

Expand Down Expand Up @@ -475,7 +528,8 @@ public boolean equals(Object obj) {
&& indices.equals(that.indices)
&& indexSnapshots.equals(that.indexSnapshots)
&& shardGenerations.equals(that.shardGenerations)
&& indexMetaDataGenerations.equals(that.indexMetaDataGenerations);
&& indexMetaDataGenerations.equals(that.indexMetaDataGenerations)
&& snapshotTypes.equals(that.snapshotTypes);
}

@Override
Expand All @@ -487,7 +541,8 @@ public int hashCode() {
indices,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand Down Expand Up @@ -542,6 +597,7 @@ public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String
private static final String UUID = "uuid";
private static final String STATE = "state";
private static final String VERSION = "version";
private static final String SNAPSHOT_TYPE = "snapshot_type";
private static final String MIN_VERSION = "min_version";

/**
Expand Down Expand Up @@ -570,6 +626,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
if (version != null) {
builder.field(VERSION, version.toString());
}
final SnapshotType snapshotType = snapshotTypes.get(snapshotUUID);
if (snapshotType != null) {
builder.field(SNAPSHOT_TYPE, snapshotType.toString());
}
builder.endObject();
}
builder.endArray();
Expand Down Expand Up @@ -611,6 +671,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g
final Map<String, SnapshotId> snapshots = new HashMap<>();
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
final Map<String, Version> snapshotVersions = new HashMap<>();
final Map<String, SnapshotType> snapshotTypes = new HashMap<>();
final Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
final Map<String, IndexId> indexLookup = new HashMap<>();
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
Expand All @@ -620,7 +681,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g
final String field = parser.currentName();
switch (field) {
case SNAPSHOTS:
parseSnapshots(parser, snapshots, snapshotStates, snapshotVersions, indexMetaLookup);
parseSnapshots(parser, snapshots, snapshotStates, snapshotVersions, indexMetaLookup, snapshotTypes);
break;
case INDICES:
parseIndices(parser, snapshots, indexSnapshots, indexLookup, shardGenerations);
Expand All @@ -646,7 +707,8 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g
snapshotVersions,
indexSnapshots,
shardGenerations.build(),
buildIndexMetaGenerations(indexMetaLookup, indexLookup, indexMetaIdentifiers)
buildIndexMetaGenerations(indexMetaLookup, indexLookup, indexMetaIdentifiers),
snapshotTypes
);
}

Expand Down Expand Up @@ -697,7 +759,8 @@ private static void parseSnapshots(
Map<String, SnapshotId> snapshots,
Map<String, SnapshotState> snapshotStates,
Map<String, Version> snapshotVersions,
Map<SnapshotId, Map<String, String>> indexMetaLookup
Map<SnapshotId, Map<String, String>> indexMetaLookup,
Map<String, SnapshotType> snapshotTypes
) throws IOException {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
final Map<String, String> stringDeduplicator = new HashMap<>();
Expand All @@ -707,6 +770,7 @@ private static void parseSnapshots(
SnapshotState state = null;
Map<String, String> metaGenerations = null;
Version version = null;
SnapshotType snapshotType = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
Expand All @@ -726,6 +790,9 @@ private static void parseSnapshots(
case VERSION:
version = Version.fromString(parser.text());
break;
case SNAPSHOT_TYPE:
snapshotType = SnapshotType.fromString(parser.text());
break;
}
}
final SnapshotId snapshotId = new SnapshotId(name, uuid);
Expand All @@ -735,6 +802,9 @@ private static void parseSnapshots(
if (version != null) {
snapshotVersions.put(uuid, version);
}
if (snapshotType != null) {
snapshotTypes.put(uuid, snapshotType);
}
snapshots.put(uuid, snapshotId);
if (metaGenerations != null && metaGenerations.isEmpty() == false) {
indexMetaLookup.put(snapshotId, metaGenerations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotMissingException;
import org.opensearch.snapshots.SnapshotType;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -1722,7 +1723,8 @@ public void finalizeSnapshot(
Version.CURRENT,
shardGenerations,
indexMetas,
indexMetaIdentifiers
indexMetaIdentifiers,
snapshotInfo.getSnapshotType()
);
writeIndexGen(
updatedRepositoryData,
Expand Down Expand Up @@ -2335,22 +2337,23 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

// Step 2: Write new index-N blob to repository and update index.latest
setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
// BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new
// RepositoryData contains a version for every snapshot
final List<SnapshotId> snapshotIdsWithoutVersion = repositoryData.getSnapshotIds()
// BwC logic: Load snapshot version or snapshot type information if any snapshot is missing a version or
// type in RepositoryData so that the new RepositoryData contains version and type for every snapshot
final List<SnapshotId> snapshotIdsWithoutVersionOrType = repositoryData.getSnapshotIds()
.stream()
.filter(snapshotId -> repositoryData.getVersion(snapshotId) == null)
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
if (snapshotIdsWithoutVersion.isEmpty() == false) {
if (snapshotIdsWithoutVersionOrType.isEmpty() == false) {
final Map<SnapshotId, Version> updatedVersionMap = new ConcurrentHashMap<>();
final GroupedActionListener<Void> loadAllVersionsListener = new GroupedActionListener<>(
final Map<SnapshotId, SnapshotType> updatedSnapshotTypeMap = new ConcurrentHashMap<>();
final GroupedActionListener<Void> loadMissingDataListener = new GroupedActionListener<>(
ActionListener.runAfter(new ActionListener<Collection<Void>>() {
@Override
public void onResponse(Collection<Void> voids) {
logger.info(
"Successfully loaded all snapshot's version information for {} from snapshot metadata",
"Successfully loaded all snapshot's version and type information for {} from snapshot metadata",
AllocationService.firstListElementsToCommaDelimitedString(
snapshotIdsWithoutVersion,
snapshotIdsWithoutVersionOrType,
SnapshotId::toString,
logger.isDebugEnabled()
)
Expand All @@ -2359,19 +2362,20 @@ public void onResponse(Collection<Void> voids) {

@Override
public void onFailure(Exception e) {
logger.warn("Failure when trying to load missing version information from snapshot metadata", e);
logger.warn("Failure when trying to load missing version or type information from snapshot metadata", e);
}
}, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))),
snapshotIdsWithoutVersion.size()
},
() -> filterRepositoryDataStep.onResponse(
repositoryData.withVersions(updatedVersionMap).withSnapshotTypes(updatedSnapshotTypeMap)
)
),
snapshotIdsWithoutVersionOrType.size()
);
for (SnapshotId snapshotId : snapshotIdsWithoutVersion) {
threadPool().executor(ThreadPool.Names.SNAPSHOT)
.execute(
ActionRunnable.run(
loadAllVersionsListener,
() -> updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version())
)
);
for (SnapshotId snapshotId : snapshotIdsWithoutVersionOrType) {
threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadMissingDataListener, () -> {
updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version());
updatedSnapshotTypeMap.put(snapshotId, getSnapshotInfo(snapshotId).getSnapshotType());
}));
}
} else {
filterRepositoryDataStep.onResponse(repositoryData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ public Boolean isRemoteStoreIndexShallowCopyEnabled() {
return remoteStoreIndexShallowCopy;
}

public SnapshotType getSnapshotType() {
if (remoteStoreIndexShallowCopy != null && remoteStoreIndexShallowCopy) {
return SnapshotType.SHALLOW_COPY;
}
return SnapshotType.FULL_COPY;
}

/**
* Returns shard failures; an empty list will be returned if there were no shard
* failures, or if {@link #state()} returns {@code null}.
Expand Down
Loading
Loading