Skip to content

Commit

Permalink
WIP, doesn't compile. A lot of in-progress changes to support a full …
Browse files Browse the repository at this point in the history
…RFS JUnit test.

Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Jun 18, 2024
1 parent e075d83 commit 8703c20
Show file tree
Hide file tree
Showing 16 changed files with 426 additions and 328 deletions.
80 changes: 40 additions & 40 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,79 +3,79 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

import com.rfs.common.UsernamePassword;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import com.rfs.common.ConnectionDetails;
import com.rfs.common.Logging;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.SnapshotCreator;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.S3SnapshotCreator;
import com.rfs.worker.SnapshotRunner;

import java.util.Optional;
import java.util.function.Function;

@Slf4j
public class CreateSnapshot {
public static class Args {
@Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
@Parameter(names = {"--snapshot-name"},
required = true,
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true)
@Parameter(names = {"--s3-repo-uri"},
required = true,
description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2")
public String s3RepoUri;

@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
@Parameter(names = {"--s3-region"},
required = true,
description = "The AWS Region the S3 bucket is in, like: us-east-2"
)
public String s3Region;

@Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = true)
@Parameter(names = {"--source-host"},
required = true,
description = "The source host and port (e.g. http://localhost:9200)")
public String sourceHost;

@Parameter(names = {"--source-username"}, description = "Optional. The source username; if not provided, will assume no auth on source", required = false)
@Parameter(names = {"--source-username"},
description = "Optional. The source username; if not provided, will assume no auth on source")
public String sourceUser = null;

@Parameter(names = {"--source-password"}, description = "Optional. The source password; if not provided, will assume no auth on source", required = false)
@Parameter(names = {"--source-password"},
description = "Optional. The source password; if not provided, will assume no auth on source")
public String sourcePass = null;
}

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

@Parameter(names = {"--target-username"}, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;
@Getter
@AllArgsConstructor
public static class S3RepoInfo {
String awsRegion;
String repoUri;
}

public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);
.addObject(arguments)
.build()
.parse(args);

final String snapshotName = arguments.snapshotName;
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final String sourceHost = arguments.sourceHost;
final String sourceUser = arguments.sourceUser;
final String sourcePass = arguments.sourcePass;
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;

final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);
log.info("Running CreateSnapshot with " + String.join(" ", args));
run(c -> new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region),
new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass));
}

public static void run(Function<OpenSearchClient,SnapshotCreator> snapshotCreatorFactory,
OpenSearchClient openSearchClient)
throws Exception {
TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);

final SnapshotCreator snapshotCreator =
new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region);
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
SnapshotRunner.runAndWaitForCompletion(snapshotCreatorFactory.apply(openSearchClient));
});
}
}
12 changes: 12 additions & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ dependencies {
implementation group: 'com.beust', name: 'jcommander'
implementation group: 'org.slf4j', name: 'slf4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'

testImplementation testFixtures(project(":RFS"))
testImplementation project(":CreateSnapshot")
testImplementation project(":MetadataMigration")
testImplementation group: 'org.apache.lucene', name: 'lucene-core'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params'
testImplementation group: 'org.opensearch', name: 'opensearch-testcontainers'
testImplementation group: 'org.testcontainers', name: 'testcontainers'

testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')
testImplementation group: 'io.projectreactor.netty', name: 'reactor-netty-core'
}

application {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.util.UUID;
import java.util.function.Function;

import lombok.extern.slf4j.Slf4j;
import com.rfs.cms.ApacheHttpClient;
Expand Down Expand Up @@ -41,30 +43,45 @@
@Slf4j
public class RfsMigrateDocuments {
public static final int PROCESS_TIMED_OUT = 1;
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;

public static class Args {
@Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
@Parameter(names = {"--snapshot-name"},
required = true,
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(names = {"--s3-local-dir"}, description = "The absolute path to the directory on local disk to download S3 files to", required = true)
@Parameter(names = {"--s3-local-dir"},
required = true,
description = "The absolute path to the directory on local disk to download S3 files to")
public String s3LocalDirPath;

@Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true)
@Parameter(names = {"--s3-repo-uri"},
required = true,
description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2")
public String s3RepoUri;

@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
@Parameter(names = {"--s3-region"},
required = true,
description = "The AWS Region the S3 bucket is in, like: us-east-2")
public String s3Region;

@Parameter(names = {"--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true)
@Parameter(names = {"--lucene-dir"},
required = true,
description = "The absolute path to the directory where we'll put the Lucene docs")
public String luceneDirPath;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
@Parameter(names = {"--target-host"},
required = true,
description = "The target host and port (e.g. http://localhost:9200)")
public String targetHost;

@Parameter(names = {"--target-username"}, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
@Parameter(names = {"--target-username"},
description = "Optional. The target username; if not provided, will assume no auth on target")
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
@Parameter(names = {"--target-password"},
description = "Optional. The target password; if not provided, will assume no auth on target")
public String targetPass = null;

@Parameter(names = {"--max-shard-size-bytes"}, description = ("Optional. The maximum shard size, in bytes, to allow when"
Expand All @@ -76,48 +93,61 @@ public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);

final String snapshotName = arguments.snapshotName;
final Path s3LocalDirPath = Paths.get(arguments.s3LocalDirPath);
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final Path luceneDirPath = Paths.get(arguments.luceneDirPath);
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final long maxShardSizeBytes = arguments.maxShardSizeBytes;

final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);
.addObject(arguments)
.build()
.parse(args);

var luceneDirPath = Paths.get(arguments.luceneDirPath);
var processManager = new ProcessManager(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString());

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");

OpenSearchClient targetClient = new OpenSearchClient(targetConnection);

final SourceRepo sourceRepo = S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
final SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
final ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
final DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
final SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor, luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);
final LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
final DocumentReindexer reindexer = new DocumentReindexer(targetClient);

var processManager = new ProcessManager(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for "+workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(targetHost)),
5, UUID.randomUUID().toString());

var scopedWorkCoordinator = new ScopedWorkCoordinatorHelper(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new DocumentsRunner(scopedWorkCoordinator, snapshotName,
shardMetadataFactory, unpackerFactory, reader, reindexer).migrateNextShard();
OpenSearchClient targetClient =
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,
luceneDirPath, ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES);

run(LuceneDocumentsReader::new, reindexer, workCoordinator, processManager, indexMetadataFactory,
arguments.snapshotName, shardMetadataFactory, unpackerFactory, arguments.maxShardSizeBytes);
});
}

public static void run(Function<Path,LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
OpenSearchWorkCoordinator workCoordinator,
ProcessManager processManager,
IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes)
throws IOException {
var scopedWorkCoordinator = new ScopedWorkCoordinatorHelper(workCoordinator, processManager);
new ShardWorkPreparer().run(scopedWorkCoordinator, indexMetadataFactory, snapshotName);
new DocumentsRunner(scopedWorkCoordinator,
(name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
},
unpackerFactory, readerFactory, reindexer).migrateNextShard();
}
}
Loading

0 comments on commit 8703c20

Please sign in to comment.