diff --git a/DocumentsFromSnapshotMigration/docker/entrypoint.sh b/DocumentsFromSnapshotMigration/docker/entrypoint.sh index 7deb44da9..407277328 100755 --- a/DocumentsFromSnapshotMigration/docker/entrypoint.sh +++ b/DocumentsFromSnapshotMigration/docker/entrypoint.sh @@ -43,5 +43,45 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then fi fi -echo "Executing RFS Command" -eval $RFS_COMMAND +# Extract the value passed after --s3-local-dir +S3_LOCAL_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--s3-local-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"') +# Extract the value passed after --lucene-dir +LUCENE_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--lucene-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"') +if [[ -n "$S3_LOCAL_DIR" ]]; then + echo "Will delete S3 local directory between runs: $S3_LOCAL_DIR" +else + echo "--s3-local-dir argument not found in RFS_COMMAND. Will not delete S3 local directory between runs." +fi + +if [[ -n "$LUCENE_DIR" ]]; then + echo "Will delete lucene local directory between runs: $LUCENE_DIR" +else + echo "--lucene-dir argument not found in RFS_COMMAND. This is required." + exit 1 +fi + +cleanup_directories() { + if [[ -n "$S3_LOCAL_DIR" ]]; then + echo "Cleaning up S3 local directory: $S3_LOCAL_DIR" + rm -rf "$S3_LOCAL_DIR" + echo "Directory $S3_LOCAL_DIR has been cleaned up." + fi + + if [[ -n "$LUCENE_DIR" ]]; then + echo "Cleaning up Lucene local directory: $LUCENE_DIR" + rm -rf "$LUCENE_DIR" + echo "Directory $LUCENE_DIR has been cleaned up." + fi +} + + + +[ -z "$RFS_COMMAND" ] && \ +{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \ +until ! { + echo "Running command $RFS_COMMAND" + eval "$RFS_COMMAND" +}; do + echo "Cleaning up directories before the next run." + cleanup_directories +done diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index e50369a53..592c09dd1 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -45,7 +45,8 @@ @Slf4j public class RfsMigrateDocuments { - public static final int PROCESS_TIMED_OUT = 2; + public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2; + public static final int NO_WORK_LEFT_EXIT_CODE = 3; public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5; public static final String LOGGING_MDC_WORKER_ID = "workerId"; @@ -184,15 +185,12 @@ public static void main(String[] args) throws Exception { var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null; var connectionContext = arguments.targetArgs.toConnectionContext(); - try (var processManager = new LeaseExpireTrigger(workItemId -> { - log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); - System.exit(PROCESS_TIMED_OUT); - }, Clock.systemUTC()); - var workCoordinator = new OpenSearchWorkCoordinator( - new CoordinateWorkHttpClient(connectionContext), - TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, - workerId - )) { + try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC()); + var workCoordinator = new OpenSearchWorkCoordinator( + new CoordinateWorkHttpClient(connectionContext), + TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, + workerId) + ) { MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main OpenSearchClient targetClient = new OpenSearchClient(connectionContext); DocumentReindexer reindexer = new DocumentReindexer(targetClient, @@ -233,12 +231,20 @@ public static void main(String[] args) throws Exception { unpackerFactory, arguments.maxShardSizeBytes, context); + } catch (NoWorkLeftException e) { + log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log(); + System.exit(NO_WORK_LEFT_EXIT_CODE); } catch (Exception e) { log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log(); throw e; } } + private static void exitOnLeaseTimeout(String workItemId) { + log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); + System.exit(PROCESS_TIMED_OUT_EXIT_CODE); + } + private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) { var compositeContextTracker = new CompositeContextTracker( new ActiveContextTracker(), diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index b577ddd3e..481e144dc 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -10,9 +10,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -24,6 +26,9 @@ import org.opensearch.testcontainers.OpensearchContainer; import eu.rekawek.toxiproxy.model.ToxicDirection; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.testcontainers.containers.Network; @@ -47,6 +52,37 @@ enum FailHow { WITH_DELAYS } + @AllArgsConstructor + @Getter + private static class RunData { + Path tempDirSnapshot; + Path tempDirLucene; + ToxiProxyWrapper proxyContainer; + } + + @Test + @Tag("longTest") + public void testExitsZeroThenThreeForSimpleSetup() throws Exception { + testProcess(3, + d -> { + var firstExitCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + Assertions.assertEquals(0, firstExitCode); + for (int i=0; i<10; ++i) { + var secondExitCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + if (secondExitCode != 0) { + var lastErrorCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + Assertions.assertEquals(secondExitCode, lastErrorCode); + return lastErrorCode; + } + } + Assertions.fail("Ran for many test iterations and didn't get a No Work Available exit code"); + return -1; // won't be evaluated + }); + } + @ParameterizedTest @CsvSource(value = { // This test will go through a proxy that doesn't add any defects and the process will use defaults @@ -62,6 +98,12 @@ enum FailHow { "WITH_DELAYS, 2" }) public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception { final var failHow = FailHow.valueOf(failAfterString); + testProcess(expectedExitCode, + d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, failHow)); + } + + @SneakyThrows + private void testProcess(int expectedExitCode, Function processRunner) { final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2); @@ -108,7 +150,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); - int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow); + int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer)); log.atInfo().setMessage("Process exited with code: " + actualExitCode).log(); // Check if the exit code is as expected @@ -123,12 +165,13 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC } } + @SneakyThrows private static int runProcessAgainstToxicTarget( Path tempDirSnapshot, Path tempDirLucene, ToxiProxyWrapper proxyContainer, - FailHow failHow - ) throws IOException, InterruptedException { + FailHow failHow) + { String targetAddress = proxyContainer.getProxyUriAsString(); var tp = proxyContainer.getProxy(); if (failHow == FailHow.AT_STARTUP) { diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py index a95dc9f8d..70f53b4e1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py @@ -62,7 +62,8 @@ def setup_backfill(request): assert metadata_result.success backfill_start_result: CommandResult = backfill.start() assert backfill_start_result.success - backfill_scale_result: CommandResult = backfill.scale(units=10) + # small enough to allow containers to be reused, big enough to test scaling out + backfill_scale_result: CommandResult = backfill.scale(units=2) assert backfill_scale_result.success