Skip to content

Commit

Permalink
[RFS] Split RFS Worker into phase-specific entrypoints (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#719)

* Split RFS Worker into phase-specific entrypoints

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Fixed an RFS logging bug

Signed-off-by: Chris Helma <chelma+github@amazon.com>

---------

Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma authored Jun 13, 2024
1 parent 055e07a commit 5a5b7e6
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 36 deletions.
15 changes: 15 additions & 0 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ task runRfsWorker (type: JavaExec) {
mainClass = 'com.rfs.RunRfsWorker'
}

task createSnapshot (type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'com.rfs.RfsCreateSnapshot'
}

task migrateMetadata (type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'com.rfs.RfsMigrateMetadata'
}

task migrateDocuments (type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = 'com.rfs.RfsMigrateDocuments'
}

// Cleanup additional docker build directory
clean.doFirst {
delete project.file("./docker/build")
Expand Down
92 changes: 92 additions & 0 deletions RFS/src/main/java/com/rfs/RfsCreateSnapshot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.rfs;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import com.rfs.cms.CmsClient;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.Logging;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.SnapshotCreator;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.S3SnapshotCreator;
import com.rfs.worker.GlobalState;
import com.rfs.worker.SnapshotRunner;

public class RfsCreateSnapshot {
private static final Logger logger = LogManager.getLogger(RfsCreateSnapshot.class);

public static class Args {
@Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
public String snapshotName;

@Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true)
public String s3RepoUri;

@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
public String s3Region;

@Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = true)
public String sourceHost;

@Parameter(names = {"--source-username"}, description = "Optional. The source username; if not provided, will assume no auth on source", required = false)
public String sourceUser = null;

@Parameter(names = {"--source-password"}, description = "Optional. The source password; if not provided, will assume no auth on source", required = false)
public String sourcePass = null;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

@Parameter(names = {"--target-username"}, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--log-level"}, description = "What log level you want. Default: 'info'", required = false, converter = Logging.ArgsConverter.class)
public Level logLevel = Level.INFO;
}

public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);

final String snapshotName = arguments.snapshotName;
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final String sourceHost = arguments.sourceHost;
final String sourceUser = arguments.sourceUser;
final String sourcePass = arguments.sourcePass;
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final Level logLevel = arguments.logLevel;

Logging.setLevel(logLevel);

final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

TryHandlePhaseFailure.executeWithTryCatch(() -> {
logger.info("Running RfsWorker");
GlobalState globalState = GlobalState.getInstance();
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
final CmsClient cmsClient = new OpenSearchCmsClient(targetClient);

final SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region);
final SnapshotRunner snapshotWorker = new SnapshotRunner(globalState, cmsClient, snapshotCreator);
snapshotWorker.run();
});
}
}
117 changes: 117 additions & 0 deletions RFS/src/main/java/com/rfs/RfsMigrateDocuments.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.rfs;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;


import com.rfs.cms.CmsClient;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.IndexMetadata;
import com.rfs.common.Logging;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Uri;
import com.rfs.common.ShardMetadata;
import com.rfs.common.S3Repo;
import com.rfs.common.SourceRepo;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.GlobalState;

public class RfsMigrateDocuments {
private static final Logger logger = LogManager.getLogger(RfsMigrateDocuments.class);

public static class Args {
@Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
public String snapshotName;

@Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = true)
public String s3LocalDirPath;

@Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true)
public String s3RepoUri;

@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
public String s3Region;

@Parameter(names = {"--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true)
public String luceneDirPath;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

@Parameter(names = {"--target-username"}, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when"
+ " performing the document migration. Useful for preventing disk overflow. Default: 50 * 1024 * 1024 * 1024 (50 GB)"), required = false)
public long maxShardSizeBytes = 50 * 1024 * 1024 * 1024L;

@Parameter(names = {"--log-level"}, description = "What log level you want. Default: 'info'", required = false, converter = Logging.ArgsConverter.class)
public Level logLevel = Level.INFO;
}

public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);

final String snapshotName = arguments.snapshotName;
final Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath);
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final Path luceneDirPath = Paths.get(arguments.luceneDirPath);
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final long maxShardSizeBytes = arguments.maxShardSizeBytes;
final Level logLevel = arguments.logLevel;

Logging.setLevel(logLevel);

final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

TryHandlePhaseFailure.executeWithTryCatch(() -> {
logger.info("Running RfsWorker");

GlobalState globalState = GlobalState.getInstance();
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
final CmsClient cmsClient = new OpenSearchCmsClient(targetClient);

final SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
final SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
final ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
final DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
final SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
final LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
final DocumentReindexer reindexer = new DocumentReindexer(targetClient);

DocumentsRunner documentsWorker = new DocumentsRunner(globalState, cmsClient, snapshotName, maxShardSizeBytes, indexMetadataFactory, shardMetadataFactory, unpackerFactory, reader, reindexer);
documentsWorker.run();
});
}
}
130 changes: 130 additions & 0 deletions RFS/src/main/java/com/rfs/RfsMigrateMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.rfs;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;


import com.rfs.cms.CmsClient;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
import com.rfs.common.Logging;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Uri;
import com.rfs.common.S3Repo;
import com.rfs.common.SourceRepo;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.SnapshotRepo;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import com.rfs.worker.GlobalState;
import com.rfs.worker.IndexRunner;
import com.rfs.worker.MetadataRunner;

public class RfsMigrateMetadata {
private static final Logger logger = LogManager.getLogger(RfsMigrateMetadata.class);

public static class Args {
@Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
public String snapshotName;

@Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = true)
public String s3LocalDirPath;

@Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true)
public String s3RepoUri;

@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
public String s3Region;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

@Parameter(names = {"--target-username"}, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false)
public List<String> indexAllowlist = List.of();

@Parameter(names = {"--index-template-allowlist"}, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
public List<String> indexTemplateAllowlist = List.of();

@Parameter(names = {"--component-template-allowlist"}, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
public List<String> componentTemplateAllowlist = List.of();

//https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {"--min-replicas"}, description = ("Optional. The minimum number of replicas configured for migrated indices on the target."
+ " This can be useful for migrating to targets which use zonal deployments and require additional replicas to meet zone requirements. Default: 0")
, required = false)
public int minNumberOfReplicas = 0;

@Parameter(names = {"--log-level"}, description = "What log level you want. Default: 'info'", required = false, converter = Logging.ArgsConverter.class)
public Level logLevel = Level.INFO;
}

public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);

final String snapshotName = arguments.snapshotName;
final Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath);
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
final List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
final int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
final Level logLevel = arguments.logLevel;

Logging.setLevel(logLevel);

final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);

TryHandlePhaseFailure.executeWithTryCatch(() -> {
logger.info("Running RfsWorker");
GlobalState globalState = GlobalState.getInstance();
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
final CmsClient cmsClient = new OpenSearchCmsClient(targetClient);

final SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
final SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
final GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider);
final GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateAllowlist, indexTemplateAllowlist);
final Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality);
MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
metadataWorker.run();

final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
final IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer);
indexWorker.run();
});
}
}
Loading

0 comments on commit 5a5b7e6

Please sign in to comment.