Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run the RFS Container's DocumentMigration application repeatedly as long as it's successful #1047

Merged
46 changes: 44 additions & 2 deletions DocumentsFromSnapshotMigration/docker/entrypoint.sh
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,47 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then
fi
fi

echo "Executing RFS Command"
eval $RFS_COMMAND
# Extract the value passed after --s3-local-dir
S3_LOCAL_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--s3-local-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
# Extract the value passed after --lucene-dir
LUCENE_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--lucene-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Will delete S3 local directory between runs: $S3_LOCAL_DIR"
rm -rf "$S3_LOCAL_DIR"
echo "Directory $S3_LOCAL_DIR has been deleted."
else
echo "--s3-local-dir argument not found in RFS_COMMAND. Will not delete S3 local directory between runs."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Will delete lucene local directory between runs: $LUCENE_DIR"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but - if we pre-delete the S3 directory (lines 52-23), why not pre-delete this one as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed lines 52-53. We'll run document migration once before deleting anything

else
echo "--lucene-dir argument not found in RFS_COMMAND. This is required."
exit 1
fi

cleanup_directories() {
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Cleaning up S3 local directory: $S3_LOCAL_DIR"
rm -rf "$S3_LOCAL_DIR"
echo "Directory $S3_LOCAL_DIR has been cleaned up."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Cleaning up Lucene local directory: $LUCENE_DIR"
rm -rf "$LUCENE_DIR"
echo "Directory $LUCENE_DIR has been cleaned up."
fi
}



[ -z "$RFS_COMMAND" ] && \
{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \
until ! {
echo "Running command $RFS_COMMAND"
eval "$RFS_COMMAND"
}; do
echo "Cleaning up directories before the next run."
cleanup_directories
done
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@

@Slf4j
public class RfsMigrateDocuments {
public static final int PROCESS_TIMED_OUT = 2;
public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2;
public static final int NO_WORK_LEFT_EXIT_CODE = 3;
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

Expand Down Expand Up @@ -184,15 +185,12 @@
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;

var connectionContext = arguments.targetArgs.toConnectionContext();
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId
)) {
try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(

Check warning on line 189 in DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java

View check run for this annotation

Codecov / codecov/patch

DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java#L188-L189

Added lines #L188 - L189 were not covered by tests
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId)
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
Expand Down Expand Up @@ -233,12 +231,20 @@
unpackerFactory,
arguments.maxShardSizeBytes,
context);
} catch (NoWorkLeftException e) {
log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log();
System.exit(NO_WORK_LEFT_EXIT_CODE);

Check warning on line 236 in DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java

View check run for this annotation

Codecov / codecov/patch

DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java#L234-L236

Added lines #L234 - L236 were not covered by tests
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
throw e;
}
}

private static void exitOnLeaseTimeout(String workItemId) {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

Check warning on line 246 in DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java

View check run for this annotation

Codecov / codecov/patch

DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java#L244-L246

Added lines #L244 - L246 were not covered by tests

private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

Expand All @@ -24,6 +26,9 @@
import org.opensearch.testcontainers.OpensearchContainer;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.Network;
Expand All @@ -47,6 +52,37 @@ enum FailHow {
WITH_DELAYS
}

@AllArgsConstructor
@Getter
private static class RunData {
Path tempDirSnapshot;
Path tempDirLucene;
ToxiProxyWrapper proxyContainer;
}

@Test
@Tag("longTest")
public void testExitsZeroThenThreeForSimpleSetup() throws Exception {
testProcess(3,
d -> {
var firstExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(0, firstExitCode);
for (int i=0; i<10; ++i) {
var secondExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
if (secondExitCode != 0) {
var lastErrorCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(secondExitCode, lastErrorCode);
return lastErrorCode;
}
}
Assertions.fail("Ran for many test iterations and didn't get a No Work Available exit code");
return -1; // won't be evaluated
});
}

@ParameterizedTest
@CsvSource(value = {
// This test will go through a proxy that doesn't add any defects and the process will use defaults
Expand All @@ -62,6 +98,12 @@ enum FailHow {
"WITH_DELAYS, 2" })
public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception {
final var failHow = FailHow.valueOf(failAfterString);
testProcess(expectedExitCode,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, failHow));
}

@SneakyThrows
private void testProcess(int expectedExitCode, Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2);
Expand Down Expand Up @@ -108,7 +150,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC

esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow);
int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer));
log.atInfo().setMessage("Process exited with code: " + actualExitCode).log();

// Check if the exit code is as expected
Expand All @@ -123,12 +165,13 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
}
}

@SneakyThrows
private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer,
FailHow failHow
) throws IOException, InterruptedException {
FailHow failHow)
{
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
if (failHow == FailHow.AT_STARTUP) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def setup_backfill(request):
assert metadata_result.success
backfill_start_result: CommandResult = backfill.start()
assert backfill_start_result.success
backfill_scale_result: CommandResult = backfill.scale(units=10)
# small enough to allow containers to be reused, big enough to test scaling out
backfill_scale_result: CommandResult = backfill.scale(units=2)
assert backfill_scale_result.success


Expand Down