Skip to content

Commit

Permalink
Handle transformation exceptions more gracefully
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <peternied@hotmail.com>
  • Loading branch information
peternied committed Oct 9, 2024
1 parent 08e4f61 commit 6ad40bf
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.opensearch.migrations.bulkload.transformers.TransformFunctions;
import org.opensearch.migrations.bulkload.transformers.Transformer;
import org.opensearch.migrations.bulkload.worker.IndexMetadataResults;
import org.opensearch.migrations.bulkload.worker.IndexRunner;
import org.opensearch.migrations.bulkload.worker.IndexReadException;
import org.opensearch.migrations.bulkload.worker.MetadataRunner;
import org.opensearch.migrations.cli.ClusterReaderExtractor;
import org.opensearch.migrations.cli.Clusters;
Expand Down Expand Up @@ -68,7 +68,7 @@ protected Items migrateAllItems(MigrationMode migrationMode, Clusters clusters,

if (metadataResults.fatalIssueCount() == 0) {
var indexResults = migrateIndices(migrationMode, clusters, transformer, context);
items.indexes(indexResults.getIndexNames());
items.indexes(indexResults.getIndexes());
items.aliases(indexResults.getAliases());
} else {
items.indexes(List.of());
Expand All @@ -93,7 +93,7 @@ private GlobalMetadataCreatorResults migrateGlobalMetadata(MigrationMode mode, C
}

private IndexMetadataResults migrateIndices(MigrationMode mode, Clusters clusters, Transformer transformer, RootMetadataMigrationContext context) {
var indexRunner = new IndexRunner(
var indexRunner = new IndexReadException(
arguments.snapshotName,
clusters.getSource().getIndexMetadata(),
clusters.getTarget().getIndexCreator(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class EndToEndTest {

private static Stream<Arguments> scenarios() {
return Stream.of(
Arguments.of(TransferMedium.Http, MetadataCommands.EVALUATE),
Arguments.of(TransferMedium.SnapshotImage, MetadataCommands.MIGRATE),
Arguments.of(TransferMedium.Http, MetadataCommands.MIGRATE)
Arguments.of(TransferMedium.Http, MetadataCommands.EVALUATE)
// Arguments.of(TransferMedium.SnapshotImage, MetadataCommands.MIGRATE),
// Arguments.of(TransferMedium.Http, MetadataCommands.MIGRATE)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opensearch.migrations.bulkload.transformers;

import org.opensearch.migrations.bulkload.common.RfsException;

public class IndexTransformationException extends RfsException {
public IndexTransformationException(String indexName, Throwable cause) {
super("Transformation for index index '" + indexName + "' failed.", cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ else if (val instanceof ObjectNode) {
* it regardless of what it is named.
*/
public static ObjectNode getMappingsFromBeneathIntermediate(ObjectNode mappingsRoot) {
if (mappingsRoot.has(PROPERTIES_KEY_STR)) {
if (mappingsRoot.size() == 0) {
return mappingsRoot;
} else if (mappingsRoot.has(PROPERTIES_KEY_STR)) {
return mappingsRoot;
} else if (!mappingsRoot.has(PROPERTIES_KEY_STR)) {
return (ObjectNode) mappingsRoot.get(mappingsRoot.fieldNames().next()).deepCopy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
@Builder
public class IndexMetadataResults {
@Singular
private final List<CreationResult> indexNames;
private final List<CreationResult> indexes;
@Singular
private final List<CreationResult> aliases;

public List<CreationResult> getIndexNames() {
return indexNames == null ? List.of() : indexNames;
public List<CreationResult> getIndexes() {
return indexes == null ? List.of() : indexes;
}

public List<CreationResult> getAliases() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.transformers.IndexTransformationException;
import org.opensearch.migrations.bulkload.transformers.Transformer;
import org.opensearch.migrations.metadata.CreationResult;
import org.opensearch.migrations.metadata.CreationResult.CreationFailureType;
import org.opensearch.migrations.metadata.IndexCreator;
import org.opensearch.migrations.metadata.tracing.IMetadataMigrationContexts.ICreateIndexContext;

Expand Down Expand Up @@ -39,12 +41,25 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte
.forEach(index -> {
var indexName = index.getName();
var indexMetadata = metadataFactory.fromRepo(snapshotName, indexName);
var transformedRoot = transformer.transformIndexMetadata(indexMetadata);
var indexResult = indexCreator.create(transformedRoot, mode, context);
results.indexName(indexResult);
transformedRoot.getAliases().fieldNames().forEachRemaining( alias -> {

CreationResult indexResult = null;
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
indexResult = indexCreator.create(indexMetadata, mode, context);
} catch (Throwable t) {
indexResult = CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, t))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build();
}

var finalResult = indexResult;
results.index(finalResult);

indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
aliasResult.failureType(indexResult.getFailureType());
aliasResult.failureType(finalResult.getFailureType());
results.alias(aliasResult.build());
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public boolean wasFatal() {
@Getter
public static enum CreationFailureType {
ALREADY_EXISTS(false, "already exists"),
UNABLE_TO_TRANSFORM_FAILURE(true, "failed to transform to the target version"),
TARGET_CLUSTER_FAILURE(true, "failed on target cluster");

private final boolean fatal;
Expand Down

0 comments on commit 6ad40bf

Please sign in to comment.