Skip to content

Commit

Permalink
Merge pull request #3872 from gchq/3867-clear-instance-on-first-connect
Browse files Browse the repository at this point in the history
Issue 3867 - Clear Sleeper instance on first connect in system tests
  • Loading branch information
patchwork01 authored Dec 6, 2024
2 parents d68923e + 6b4cc25 commit 3a71581
Show file tree
Hide file tree
Showing 14 changed files with 541 additions and 66 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
],
"java.completion.favoriteStaticMembers": [
"java.util.stream.Collectors.*",
"java.util.function.Predicate.*",
"org.assertj.core.api.Assertions.*",
"org.mockito.Mockito.*",
"org.mockito.ArgumentMatchers.*",
Expand Down
109 changes: 109 additions & 0 deletions java/core/src/main/java/sleeper/core/deploy/SqsQueues.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.deploy;

import sleeper.core.properties.instance.InstanceProperty;

import java.util.List;
import java.util.stream.Stream;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_EKS_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_EMR_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_EMR_SERVERLESS_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_PERSISTENT_EMR_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_PENDING_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_PENDING_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.FIND_PARTITIONS_TO_SPLIT_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.FIND_PARTITIONS_TO_SPLIT_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.INGEST_BATCHER_SUBMIT_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.INGEST_BATCHER_SUBMIT_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.INGEST_JOB_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.INGEST_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.LEAF_PARTITION_QUERY_QUEUE_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.LEAF_PARTITION_QUERY_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.PARTITION_SPLITTING_JOB_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.PARTITION_SPLITTING_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.QUERY_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.QUERY_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.QUERY_RESULTS_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_DELETION_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_DELETION_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_TRANSACTION_DELETION_DLQ_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_TRANSACTION_DELETION_QUEUE_URL;

/**
* Definitions of SQS queues deployed in a Sleeper instance. Used when purging all queues in an instance.
*/
public class SqsQueues {

private SqsQueues() {
}

public static final List<InstanceProperty> QUEUE_URL_PROPERTIES = List.of(
TRANSACTION_LOG_SNAPSHOT_CREATION_QUEUE_URL,
TRANSACTION_LOG_SNAPSHOT_DELETION_QUEUE_URL,
TRANSACTION_LOG_TRANSACTION_DELETION_QUEUE_URL,
STATESTORE_COMMITTER_QUEUE_URL,
TABLE_METRICS_QUEUE_URL,
QUERY_QUEUE_URL,
QUERY_RESULTS_QUEUE_URL,
LEAF_PARTITION_QUERY_QUEUE_URL,
COMPACTION_JOB_CREATION_QUEUE_URL,
COMPACTION_JOB_QUEUE_URL,
COMPACTION_PENDING_QUEUE_URL,
FIND_PARTITIONS_TO_SPLIT_QUEUE_URL,
PARTITION_SPLITTING_JOB_QUEUE_URL,
GARBAGE_COLLECTOR_QUEUE_URL,
INGEST_JOB_QUEUE_URL,
INGEST_BATCHER_SUBMIT_QUEUE_URL,
BULK_IMPORT_EMR_JOB_QUEUE_URL,
BULK_IMPORT_EMR_SERVERLESS_JOB_QUEUE_URL,
BULK_IMPORT_PERSISTENT_EMR_JOB_QUEUE_URL,
BULK_IMPORT_EKS_JOB_QUEUE_URL);

public static final List<InstanceProperty> DEAD_LETTER_QUEUE_URL_PROPERTIES = List.of(
TRANSACTION_LOG_SNAPSHOT_CREATION_DLQ_URL,
TRANSACTION_LOG_SNAPSHOT_DELETION_DLQ_URL,
TRANSACTION_LOG_TRANSACTION_DELETION_DLQ_URL,
STATESTORE_COMMITTER_DLQ_URL,
TABLE_METRICS_DLQ_URL,
QUERY_DLQ_URL,
LEAF_PARTITION_QUERY_QUEUE_DLQ_URL,
COMPACTION_JOB_CREATION_DLQ_URL,
COMPACTION_JOB_DLQ_URL,
COMPACTION_PENDING_DLQ_URL,
FIND_PARTITIONS_TO_SPLIT_DLQ_URL,
PARTITION_SPLITTING_JOB_DLQ_URL,
GARBAGE_COLLECTOR_DLQ_URL,
INGEST_JOB_DLQ_URL,
INGEST_BATCHER_SUBMIT_DLQ_URL);

public static final List<InstanceProperty> ALL_QUEUE_URL_PROPERTIES = Stream.of(
QUEUE_URL_PROPERTIES, DEAD_LETTER_QUEUE_URL_PROPERTIES).flatMap(List::stream).toList();

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@
import software.amazon.awssdk.services.ecs.EcsClient;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

import sleeper.clients.deploy.InvokeLambda;
import sleeper.compaction.core.job.CompactionJob;
Expand All @@ -45,6 +42,7 @@
import sleeper.core.util.ObjectFactory;
import sleeper.core.util.ObjectFactoryException;
import sleeper.core.util.PollWithRetries;
import sleeper.systemtest.drivers.util.AwsDrainSqsQueue;
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.compaction.CompactionDriver;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
Expand All @@ -53,9 +51,7 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;

import static java.util.function.Predicate.not;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_TRIGGER_LAMBDA_FUNCTION;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_CREATION_LAMBDA_FUNCTION;
Expand Down Expand Up @@ -157,36 +153,11 @@ public void scaleToZero() {
public List<CompactionJob> drainJobsQueueForWholeInstance() {
String queueUrl = instance.getInstanceProperties().get(COMPACTION_JOB_QUEUE_URL);
LOGGER.info("Draining compaction jobs queue: {}", queueUrl);
List<CompactionJob> jobs = Stream.iterate(
receiveJobs(queueUrl), not(List::isEmpty), lastJobs -> receiveJobs(queueUrl))
.flatMap(List::stream).toList();
LOGGER.info("Found {} compaction jobs", jobs.size());
return jobs;
}

private List<CompactionJob> receiveJobs(String queueUrl) {
ReceiveMessageResponse receiveResult = sqsClientV2.receiveMessage(request -> request
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(5));
List<Message> messages = receiveResult.messages();
if (messages.isEmpty()) {
return List.of();
}
DeleteMessageBatchResponse deleteResult = sqsClientV2.deleteMessageBatch(request -> request
.queueUrl(queueUrl)
.entries(messages.stream()
.map(message -> DeleteMessageBatchRequestEntry.builder()
.id(message.messageId())
.receiptHandle(message.receiptHandle())
.build())
.toList()));
if (!deleteResult.failed().isEmpty()) {
throw new RuntimeException("Failed deleting compaction job messages: " + deleteResult.failed());
}
return messages.stream()
List<CompactionJob> jobs = AwsDrainSqsQueue.drainQueueForWholeInstance(sqsClientV2, queueUrl)
.map(Message::body)
.map(serDe::fromJson)
.toList();
LOGGER.info("Found {} jobs", jobs.size());
return jobs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.drivers.instance;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.sqs.SqsClient;

import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.table.index.DynamoDBTableIndex;
import sleeper.core.deploy.SqsQueues;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.util.PollWithRetries;
import sleeper.statestore.dynamodb.DynamoDBStateStore;
import sleeper.statestore.s3.S3StateStore;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogStateStore;
import sleeper.statestore.transactionlog.snapshots.DynamoDBTransactionLogSnapshotMetadataStore;
import sleeper.systemtest.drivers.util.AwsDrainSqsQueue;
import sleeper.systemtest.drivers.util.SystemTestClients;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.function.Predicate.not;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.ACTIVE_FILES_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.FILE_REFERENCE_COUNT_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.PARTITION_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.REVISION_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TABLE_ID_INDEX_DYNAMO_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TABLE_NAME_INDEX_DYNAMO_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TABLE_ONLINE_INDEX_DYNAMO_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_FILES_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_PARTITIONS_TABLENAME;
import static sleeper.dynamodb.tools.DynamoDBUtils.streamPagedResults;

public class AwsResetInstanceOnFirstConnect {
public static final Logger LOGGER = LoggerFactory.getLogger(AwsResetInstanceOnFirstConnect.class);
private final S3Client s3;
private final AmazonDynamoDB dynamoDB;
private final SqsClient sqs;

public AwsResetInstanceOnFirstConnect(SystemTestClients clients) {
this.s3 = clients.getS3V2();
this.dynamoDB = clients.getDynamoDB();
this.sqs = clients.getSqsV2();
}

public void reset(InstanceProperties instanceProperties) {
deleteAllTables(instanceProperties);
drainAllQueues(instanceProperties);
}

private void deleteAllTables(InstanceProperties instanceProperties) {
clearBucket(instanceProperties.get(DATA_BUCKET));
clearBucket(instanceProperties.get(CONFIG_BUCKET), key -> !S3InstanceProperties.S3_INSTANCE_PROPERTIES_FILE.equals(key));
clearTable(instanceProperties.get(ACTIVE_FILES_TABLENAME), DynamoDBStateStore.TABLE_ID, DynamoDBStateStore.PARTITION_ID_AND_FILENAME);
clearTable(instanceProperties.get(FILE_REFERENCE_COUNT_TABLENAME), DynamoDBStateStore.TABLE_ID, DynamoDBStateStore.FILE_NAME);
clearTable(instanceProperties.get(PARTITION_TABLENAME), DynamoDBStateStore.TABLE_ID, DynamoDBStateStore.PARTITION_ID);
clearTable(instanceProperties.get(REVISION_TABLENAME), S3StateStore.TABLE_ID, S3StateStore.REVISION_ID_KEY);
clearTable(instanceProperties.get(TABLE_NAME_INDEX_DYNAMO_TABLENAME), DynamoDBTableIndex.TABLE_NAME_FIELD);
clearTable(instanceProperties.get(TABLE_ID_INDEX_DYNAMO_TABLENAME), DynamoDBTableIndex.TABLE_ID_FIELD);
clearTable(instanceProperties.get(TABLE_ONLINE_INDEX_DYNAMO_TABLENAME),
DynamoDBTableIndex.TABLE_ONLINE_FIELD, DynamoDBTableIndex.TABLE_NAME_FIELD);
clearTable(instanceProperties.get(TRANSACTION_LOG_FILES_TABLENAME),
DynamoDBTransactionLogStateStore.TABLE_ID, DynamoDBTransactionLogStateStore.TRANSACTION_NUMBER);
clearTable(instanceProperties.get(TRANSACTION_LOG_PARTITIONS_TABLENAME),
DynamoDBTransactionLogStateStore.TABLE_ID, DynamoDBTransactionLogStateStore.TRANSACTION_NUMBER);
clearTable(instanceProperties.get(TRANSACTION_LOG_ALL_SNAPSHOTS_TABLENAME),
DynamoDBTransactionLogSnapshotMetadataStore.TABLE_ID_AND_SNAPSHOT_TYPE, DynamoDBTransactionLogSnapshotMetadataStore.TRANSACTION_NUMBER);
clearTable(instanceProperties.get(TRANSACTION_LOG_LATEST_SNAPSHOTS_TABLENAME), DynamoDBTransactionLogSnapshotMetadataStore.TABLE_ID);
waitForTablesToEmpty(
instanceProperties.get(TABLE_NAME_INDEX_DYNAMO_TABLENAME),
instanceProperties.get(TABLE_ID_INDEX_DYNAMO_TABLENAME),
instanceProperties.get(TABLE_ONLINE_INDEX_DYNAMO_TABLENAME));
}

private void drainAllQueues(InstanceProperties instanceProperties) {
SqsQueues.ALL_QUEUE_URL_PROPERTIES.stream()
.map(instanceProperties::get)
.filter(queueUrl -> queueUrl != null)
.parallel().forEach(queueUrl -> AwsDrainSqsQueue.emptyQueueForWholeInstance(sqs, queueUrl));
}

private void clearBucket(String bucketName) {
clearBucket(bucketName, key -> true);
}

private void clearBucket(String bucketName, Predicate<String> deleteKey) {
LOGGER.info("Clearing S3 bucket: {}", bucketName);
s3.listObjectsV2Paginator(req -> req.bucket(bucketName))
.stream().map(ListObjectsV2Response::contents)
.map(foundObjects -> foundObjects.stream()
.map(S3Object::key)
.filter(deleteKey)
.map(key -> ObjectIdentifier.builder().key(key).build())
.collect(Collectors.toUnmodifiableList()))
.filter(not(List::isEmpty))
.forEach(objectsToDelete -> s3.deleteObjects(req -> req
.bucket(bucketName)
.delete(del -> del.objects(objectsToDelete))));
}

private void clearTable(String tableName, String... keyFields) {
LOGGER.info("Clearing DynamoDB table: {}", tableName);
streamPagedResults(dynamoDB, new ScanRequest().withTableName(tableName))
.flatMap(result -> result.getItems().stream())
.parallel()
.map(item -> getKey(item, List.of(keyFields)))
.forEach(key -> dynamoDB.deleteItem(new DeleteItemRequest()
.withTableName(tableName)
.withKey(key)));
LOGGER.info("Cleared DynamoDB table: {}", tableName);
}

private void waitForTablesToEmpty(String... tableNames) {
for (String tableName : tableNames) {
waitForTableToEmpty(tableName);
}
}

private void waitForTableToEmpty(String tableName) {
LOGGER.info("Waiting for DynamoDB table to empty: {}", tableName);
try {
PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(1), Duration.ofSeconds(30))
.pollUntil("table is empty", () -> dynamoDB.scan(new ScanRequest().withTableName(tableName).withLimit(1))
.getItems().isEmpty());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for table to empty: " + tableName, e);
}
}

private static Map<String, AttributeValue> getKey(Map<String, AttributeValue> item, List<String> keyFields) {
return keyFields.stream()
.map(name -> Map.entry(name, item.get(name)))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

}
Loading

0 comments on commit 3a71581

Please sign in to comment.