Skip to content

Commit

Permalink
[Remote Store] Add snapshot type information in repository data.
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
  • Loading branch information
Harish Bhakuni committed May 25, 2024
1 parent 56d8dc6 commit 4d8fa9c
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 111 deletions.

Large diffs are not rendered by default.

100 changes: 85 additions & 15 deletions server/src/main/java/org/opensearch/repositories/RepositoryData.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.snapshots.SnapshotType;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -92,7 +93,8 @@ public final class RepositoryData {
Collections.emptyMap(),
Collections.emptyMap(),
ShardGenerations.EMPTY,
IndexMetaDataGenerations.EMPTY
IndexMetaDataGenerations.EMPTY,
Collections.emptyMap()
);

/**
Expand Down Expand Up @@ -128,14 +130,20 @@ public final class RepositoryData {
*/
private final ShardGenerations shardGenerations;

/**
* snapshot UUID to snapshot type map.
*/
private final Map<String, SnapshotType> snapshotTypes;

public RepositoryData(
long genId,
Map<String, SnapshotId> snapshotIds,
Map<String, SnapshotState> snapshotStates,
Map<String, Version> snapshotVersions,
Map<IndexId, List<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations,
IndexMetaDataGenerations indexMetaDataGenerations
IndexMetaDataGenerations indexMetaDataGenerations,
Map<String, SnapshotType> snapshotTypes
) {
this(
genId,
Expand All @@ -145,7 +153,8 @@ public RepositoryData(
Collections.unmodifiableMap(indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))),
Collections.unmodifiableMap(indexSnapshots),
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand All @@ -157,7 +166,8 @@ private RepositoryData(
Map<String, IndexId> indices,
Map<IndexId, List<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations,
IndexMetaDataGenerations indexMetaDataGenerations
IndexMetaDataGenerations indexMetaDataGenerations,
Map<String, SnapshotType> snapshotTypes
) {
this.genId = genId;
this.snapshotIds = snapshotIds;
Expand All @@ -167,6 +177,7 @@ private RepositoryData(
this.shardGenerations = shardGenerations;
this.indexMetaDataGenerations = indexMetaDataGenerations;
this.snapshotVersions = snapshotVersions;
this.snapshotTypes = snapshotTypes;
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
+ shardGenerations.indices()
+ " but snapshots only reference indices "
Expand All @@ -183,7 +194,8 @@ protected RepositoryData copy() {
snapshotVersions,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand All @@ -205,7 +217,31 @@ public RepositoryData withVersions(Map<SnapshotId, Version> versions) {
newVersions,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

/**
* Creates a copy of this instance that contains updated snapshot type data.
* @param newSnapshotTypes map of snapshot tp snapshot types
* @return copy with updated snapshot type data
*/
public RepositoryData withSnapshotTypes(Map<SnapshotId, SnapshotType> newSnapshotTypes) {
if (newSnapshotTypes.isEmpty()) {
return this;
}
final Map<String, SnapshotType> udpatedSnapshotTypes = new HashMap<>(snapshotTypes);
newSnapshotTypes.forEach((id, type) -> udpatedSnapshotTypes.put(id.getUUID(), type));
return new RepositoryData(
genId,
snapshotIds,
snapshotStates,
snapshotVersions,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations,
udpatedSnapshotTypes
);
}

Expand Down Expand Up @@ -243,6 +279,13 @@ public Version getVersion(SnapshotId snapshotId) {
return snapshotVersions.get(snapshotId.getUUID());
}

/**
* Returns the {@link SnapshotType} for the given snapshot or {@code null} if unknown.
*/
public SnapshotType getSnapshotType(SnapshotId snapshotId) {
return this.snapshotTypes.get(snapshotId.getUUID());
}

/**
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
*/
Expand Down Expand Up @@ -319,7 +362,8 @@ public RepositoryData addSnapshot(
final Version version,
final ShardGenerations shardGenerations,
@Nullable final Map<IndexId, String> indexMetaBlobs,
@Nullable final Map<String, String> newIdentifiers
@Nullable final Map<String, String> newIdentifiers,
final SnapshotType snapshotType
) {
if (snapshotIds.containsKey(snapshotId.getUUID())) {
// if the snapshot id already exists in the repository data, it means an old master
Expand All @@ -333,6 +377,10 @@ public RepositoryData addSnapshot(
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.put(snapshotId.getUUID(), version);

Map<String, SnapshotType> newSnapshotTypes = new HashMap<>(snapshotTypes);
newSnapshotTypes.put(snapshotId.getUUID(), snapshotType);

Map<IndexId, List<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : shardGenerations.indices()) {
final List<SnapshotId> snapshotIds = allIndexSnapshots.get(indexId);
Expand Down Expand Up @@ -369,7 +417,8 @@ public RepositoryData addSnapshot(
newSnapshotVersions,
allIndexSnapshots,
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(),
newIndexMetaGenerations
newIndexMetaGenerations,
newSnapshotTypes
);
}

Expand All @@ -391,7 +440,8 @@ public RepositoryData withGenId(long newGeneration) {
indices,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand All @@ -415,9 +465,11 @@ public RepositoryData removeSnapshots(final Collection<SnapshotId> snapshots, fi
}
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
final Map<String, SnapshotType> newSnapshotTypes = new HashMap<>(snapshotTypes);
for (SnapshotId snapshotId : snapshots) {
newSnapshotStates.remove(snapshotId.getUUID());
newSnapshotVersions.remove(snapshotId.getUUID());
newSnapshotTypes.remove(snapshotId.getUUID());
}
Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
for (final IndexId indexId : indices.values()) {
Expand Down Expand Up @@ -445,7 +497,8 @@ public RepositoryData removeSnapshots(final Collection<SnapshotId> snapshots, fi
.putAll(updatedShardGenerations)
.retainIndicesAndPruneDeletes(indexSnapshots.keySet())
.build(),
indexMetaDataGenerations.withRemovedSnapshots(snapshots)
indexMetaDataGenerations.withRemovedSnapshots(snapshots),
newSnapshotTypes
);
}

Expand Down Expand Up @@ -475,7 +528,8 @@ public boolean equals(Object obj) {
&& indices.equals(that.indices)
&& indexSnapshots.equals(that.indexSnapshots)
&& shardGenerations.equals(that.shardGenerations)
&& indexMetaDataGenerations.equals(that.indexMetaDataGenerations);
&& indexMetaDataGenerations.equals(that.indexMetaDataGenerations)
&& snapshotTypes.equals(that.snapshotTypes);
}

@Override
Expand All @@ -487,7 +541,8 @@ public int hashCode() {
indices,
indexSnapshots,
shardGenerations,
indexMetaDataGenerations
indexMetaDataGenerations,
snapshotTypes
);
}

Expand Down Expand Up @@ -542,6 +597,7 @@ public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String
private static final String UUID = "uuid";
private static final String STATE = "state";
private static final String VERSION = "version";
private static final String SNAPSHOT_TYPE = "snapshot_type";
private static final String MIN_VERSION = "min_version";

/**
Expand Down Expand Up @@ -570,6 +626,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
if (version != null) {
builder.field(VERSION, version.toString());
}
final SnapshotType snapshotType = snapshotTypes.get(snapshotUUID);
if (snapshotType != null) {
builder.field(SNAPSHOT_TYPE, snapshotType.toString());
}
builder.endObject();
}
builder.endArray();
Expand Down Expand Up @@ -611,6 +671,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g
final Map<String, SnapshotId> snapshots = new HashMap<>();
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
final Map<String, Version> snapshotVersions = new HashMap<>();
final Map<String, SnapshotType> snapshotTypes = new HashMap<>();
final Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
final Map<String, IndexId> indexLookup = new HashMap<>();
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
Expand All @@ -620,7 +681,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g
final String field = parser.currentName();
switch (field) {
case SNAPSHOTS:
parseSnapshots(parser, snapshots, snapshotStates, snapshotVersions, indexMetaLookup);
parseSnapshots(parser, snapshots, snapshotStates, snapshotVersions, indexMetaLookup, snapshotTypes);
break;
case INDICES:
parseIndices(parser, snapshots, indexSnapshots, indexLookup, shardGenerations);
Expand All @@ -646,7 +707,8 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g
snapshotVersions,
indexSnapshots,
shardGenerations.build(),
buildIndexMetaGenerations(indexMetaLookup, indexLookup, indexMetaIdentifiers)
buildIndexMetaGenerations(indexMetaLookup, indexLookup, indexMetaIdentifiers),
snapshotTypes
);
}

Expand Down Expand Up @@ -697,7 +759,8 @@ private static void parseSnapshots(
Map<String, SnapshotId> snapshots,
Map<String, SnapshotState> snapshotStates,
Map<String, Version> snapshotVersions,
Map<SnapshotId, Map<String, String>> indexMetaLookup
Map<SnapshotId, Map<String, String>> indexMetaLookup,
Map<String, SnapshotType> snapshotTypes
) throws IOException {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
final Map<String, String> stringDeduplicator = new HashMap<>();
Expand All @@ -707,6 +770,7 @@ private static void parseSnapshots(
SnapshotState state = null;
Map<String, String> metaGenerations = null;
Version version = null;
SnapshotType snapshotType = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
Expand All @@ -726,6 +790,9 @@ private static void parseSnapshots(
case VERSION:
version = Version.fromString(parser.text());
break;
case SNAPSHOT_TYPE:
snapshotType = SnapshotType.fromString(parser.text());
break;
}
}
final SnapshotId snapshotId = new SnapshotId(name, uuid);
Expand All @@ -735,6 +802,9 @@ private static void parseSnapshots(
if (version != null) {
snapshotVersions.put(uuid, version);
}
if (snapshotType != null) {
snapshotTypes.put(uuid, snapshotType);
}
snapshots.put(uuid, snapshotId);
if (metaGenerations != null && metaGenerations.isEmpty() == false) {
indexMetaLookup.put(snapshotId, metaGenerations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotMissingException;
import org.opensearch.snapshots.SnapshotType;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -1722,7 +1723,8 @@ public void finalizeSnapshot(
Version.CURRENT,
shardGenerations,
indexMetas,
indexMetaIdentifiers
indexMetaIdentifiers,
snapshotInfo.getSnapshotType()
);
writeIndexGen(
updatedRepositoryData,
Expand Down Expand Up @@ -2335,22 +2337,23 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

// Step 2: Write new index-N blob to repository and update index.latest
setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
// BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new
// RepositoryData contains a version for every snapshot
final List<SnapshotId> snapshotIdsWithoutVersion = repositoryData.getSnapshotIds()
// BwC logic: Load snapshot version or snapshot type information if any snapshot is missing a version or
// type in RepositoryData so that the new RepositoryData contains version and type for every snapshot
final List<SnapshotId> snapshotIdsWithoutVersionOrType = repositoryData.getSnapshotIds()
.stream()
.filter(snapshotId -> repositoryData.getVersion(snapshotId) == null)
.collect(Collectors.toList());
if (snapshotIdsWithoutVersion.isEmpty() == false) {
if (snapshotIdsWithoutVersionOrType.isEmpty() == false) {
final Map<SnapshotId, Version> updatedVersionMap = new ConcurrentHashMap<>();
final GroupedActionListener<Void> loadAllVersionsListener = new GroupedActionListener<>(
final Map<SnapshotId, SnapshotType> updatedSnapshotTypeMap = new ConcurrentHashMap<>();
final GroupedActionListener<Void> loadMissingDataListener = new GroupedActionListener<>(
ActionListener.runAfter(new ActionListener<Collection<Void>>() {
@Override
public void onResponse(Collection<Void> voids) {
logger.info(
"Successfully loaded all snapshot's version information for {} from snapshot metadata",
"Successfully loaded all snapshot's version and type information for {} from snapshot metadata",
AllocationService.firstListElementsToCommaDelimitedString(
snapshotIdsWithoutVersion,
snapshotIdsWithoutVersionOrType,
SnapshotId::toString,
logger.isDebugEnabled()
)
Expand All @@ -2359,19 +2362,20 @@ public void onResponse(Collection<Void> voids) {

@Override
public void onFailure(Exception e) {
logger.warn("Failure when trying to load missing version information from snapshot metadata", e);
logger.warn("Failure when trying to load missing version or type information from snapshot metadata", e);
}
}, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))),
snapshotIdsWithoutVersion.size()
},
() -> filterRepositoryDataStep.onResponse(
repositoryData.withVersions(updatedVersionMap).withSnapshotTypes(updatedSnapshotTypeMap)
)
),
snapshotIdsWithoutVersionOrType.size()
);
for (SnapshotId snapshotId : snapshotIdsWithoutVersion) {
threadPool().executor(ThreadPool.Names.SNAPSHOT)
.execute(
ActionRunnable.run(
loadAllVersionsListener,
() -> updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version())
)
);
for (SnapshotId snapshotId : snapshotIdsWithoutVersionOrType) {
threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadMissingDataListener, () -> {
updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version());
updatedSnapshotTypeMap.put(snapshotId, getSnapshotInfo(snapshotId).getSnapshotType());
}));
}
} else {
filterRepositoryDataStep.onResponse(repositoryData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ public Boolean isRemoteStoreIndexShallowCopyEnabled() {
return remoteStoreIndexShallowCopy;
}

public SnapshotType getSnapshotType() {
if (remoteStoreIndexShallowCopy != null && remoteStoreIndexShallowCopy) {
return SnapshotType.SHALLOW_COPY;
}
return SnapshotType.FULL_COPY;
}

/**
* Returns shard failures; an empty list will be returned if there were no shard
* failures, or if {@link #state()} returns {@code null}.
Expand Down
Loading

0 comments on commit 4d8fa9c

Please sign in to comment.