Skip to content

Commit

Permalink
Merge pull request #3973 from gchq/3968-rename-compaction-task-tracker
Browse files Browse the repository at this point in the history
Issue 3968 - Rename compaction task status store to tracker
  • Loading branch information
patchwork01 authored Dec 19, 2024
2 parents 878b805 + cd8ea7d commit 713ab9b
Show file tree
Hide file tree
Showing 54 changed files with 288 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import sleeper.cdk.stack.core.ManagedPoliciesStack;
import sleeper.compaction.status.store.job.DynamoDBCompactionJobTracker;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusFormat;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStore;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTracker;
import sleeper.core.properties.instance.InstanceProperties;

import static sleeper.cdk.util.Utils.removalPolicy;
Expand Down Expand Up @@ -77,7 +77,7 @@ public CompactionTrackerStack(

tasksTable = Table.Builder
.create(this, "DynamoDBCompactionTaskStatusTable")
.tableName(DynamoDBCompactionTaskStatusStore.taskStatusTableName(instanceId))
.tableName(DynamoDBCompactionTaskTracker.taskStatusTableName(instanceId))
.removalPolicy(removalPolicy)
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;

import sleeper.compaction.core.task.CompactionTaskStatusStore;
import sleeper.compaction.status.store.job.CompactionJobTrackerFactory;
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.ingest.batcher.core.IngestBatcherStore;
import sleeper.ingest.batcher.store.IngestBatcherStoreFactory;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
Expand All @@ -36,13 +36,13 @@ public interface AdminClientTrackerFactory {

CompactionJobTracker loadCompactionJobTracker(InstanceProperties instanceProperties);

CompactionTaskStatusStore loadCompactionTaskStatusStore(InstanceProperties instanceProperties);
CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties);

IngestJobStatusStore loadIngestJobStatusStore(InstanceProperties instanceProperties);

IngestTaskStatusStore loadIngestTaskStatusStore(InstanceProperties instanceProperties);

Optional<IngestBatcherStore> loadIngestBatcherStatusStore(InstanceProperties properties, TablePropertiesProvider tablePropertiesProvider);
Optional<IngestBatcherStore> loadIngestBatcherStore(InstanceProperties properties, TablePropertiesProvider tablePropertiesProvider);

static AdminClientTrackerFactory from(AmazonDynamoDB dynamoDB) {
return new AdminClientTrackerFactory() {
Expand All @@ -52,8 +52,8 @@ public CompactionJobTracker loadCompactionJobTracker(InstanceProperties instance
}

@Override
public CompactionTaskStatusStore loadCompactionTaskStatusStore(InstanceProperties instanceProperties) {
return CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instanceProperties);
public CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties) {
return CompactionTaskTrackerFactory.getTracker(dynamoDB, instanceProperties);
}

@Override
Expand All @@ -67,7 +67,7 @@ public IngestTaskStatusStore loadIngestTaskStatusStore(InstanceProperties instan
}

@Override
public Optional<IngestBatcherStore> loadIngestBatcherStatusStore(InstanceProperties properties, TablePropertiesProvider tablePropertiesProvider) {
public Optional<IngestBatcherStore> loadIngestBatcherStore(InstanceProperties properties, TablePropertiesProvider tablePropertiesProvider) {
return IngestBatcherStoreFactory.getStore(dynamoDB, properties, tablePropertiesProvider);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void runCompactionJobStatusReport(InstanceProperties properties, TableSt
}

private void runCompactionTaskStatusReport(InstanceProperties properties, CompactionTaskQuery queryType) {
new CompactionTaskStatusReport(trackers.loadCompactionTaskStatusStore(properties),
new CompactionTaskStatusReport(trackers.loadCompactionTaskTracker(properties),
new StandardCompactionTaskStatusReporter(out.printStream()), queryType).run();
confirmReturnToMainScreen(out, in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void chooseArgsAndPrint(String instanceId) throws InterruptedException {

if (propertiesOpt.isPresent()) {
InstanceProperties properties = propertiesOpt.get();
Optional<IngestBatcherStore> ingestBatcherStoreOpt = trackers.loadIngestBatcherStatusStore(properties,
Optional<IngestBatcherStore> ingestBatcherStoreOpt = trackers.loadIngestBatcherStore(properties,
store.createTablePropertiesProvider(properties));
if (ingestBatcherStoreOpt.isEmpty()) {
out.println("Ingest batcher stack not enabled. Please enable the optional stack IngestBatcherStack.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import software.amazon.awssdk.services.sqs.SqsClient;

import sleeper.compaction.status.store.job.DynamoDBCompactionJobTrackerCreator;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStoreCreator;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTrackerCreator;
import sleeper.core.properties.instance.InstanceProperties;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
Expand All @@ -44,7 +44,7 @@ public static CompactionDockerStack from(InstanceProperties instanceProperties,

public void deploy() {
DynamoDBCompactionJobTrackerCreator.create(instanceProperties, dynamoDB);
DynamoDBCompactionTaskStatusStoreCreator.create(instanceProperties, dynamoDB);
DynamoDBCompactionTaskTrackerCreator.create(instanceProperties, dynamoDB);
String queueName = "sleeper-" + instanceProperties.get(ID) + "-CompactionJobQ";
String queueUrl = sqsClient.createQueue(request -> request.queueName(queueName)).queueUrl();
instanceProperties.set(COMPACTION_JOB_QUEUE_URL, queueUrl);
Expand All @@ -53,7 +53,7 @@ public void deploy() {
@Override
public void tearDown() {
DynamoDBCompactionJobTrackerCreator.tearDown(instanceProperties, dynamoDB);
DynamoDBCompactionTaskStatusStoreCreator.tearDown(instanceProperties, dynamoDB);
DynamoDBCompactionTaskTrackerCreator.tearDown(instanceProperties, dynamoDB);
sqsClient.deleteQueue(request -> request.queueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,30 @@
import sleeper.clients.status.report.compaction.task.CompactionTaskQuery;
import sleeper.clients.status.report.compaction.task.CompactionTaskStatusReportArguments;
import sleeper.clients.status.report.compaction.task.CompactionTaskStatusReporter;
import sleeper.compaction.core.task.CompactionTaskStatusStore;
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;

import static sleeper.configuration.utils.AwsV1ClientHelper.buildAwsV1Client;

public class CompactionTaskStatusReport {

private final CompactionTaskStatusStore store;
private final CompactionTaskTracker tracker;
private final CompactionTaskStatusReporter reporter;
private final CompactionTaskQuery query;

public CompactionTaskStatusReport(
CompactionTaskStatusStore store,
CompactionTaskTracker tracker,
CompactionTaskStatusReporter reporter,
CompactionTaskQuery query) {
this.store = store;
this.tracker = tracker;
this.reporter = reporter;
this.query = query;
}

public void run() {
reporter.report(query, query.run(store));
reporter.report(query, query.run(tracker));
}

public static void main(String[] args) {
Expand All @@ -65,8 +65,8 @@ public static void main(String[] args) {

try {
InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, arguments.getInstanceId());
CompactionTaskStatusStore statusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
new CompactionTaskStatusReport(statusStore, arguments.getReporter(), arguments.getQuery()).run();
CompactionTaskTracker tracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties);
new CompactionTaskStatusReport(tracker, arguments.getReporter(), arguments.getQuery()).run();
} finally {
s3Client.shutdown();
dynamoDBClient.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
import sleeper.clients.status.report.compaction.task.StandardCompactionTaskStatusReporter;
import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.clients.status.report.partitions.PartitionsStatusReporter;
import sleeper.compaction.core.task.CompactionTaskStatusStore;
import sleeper.compaction.status.store.job.CompactionJobTrackerFactory;
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.properties.S3TableProperties;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.statestore.StateStore;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.statestore.StateStoreFactory;
import sleeper.task.common.QueueMessageCount;

Expand All @@ -56,22 +56,22 @@ public class StatusReport {
private final boolean verbose;
private final StateStore stateStore;
private final CompactionJobTracker compactionJobTracker;
private final CompactionTaskStatusStore compactionTaskStatusStore;
private final CompactionTaskTracker compactionTaskTracker;
private final SqsClient sqsClient;
private final QueueMessageCount.Client messageCount;
private final TablePropertiesProvider tablePropertiesProvider;

public StatusReport(
InstanceProperties instanceProperties, TableProperties tableProperties,
boolean verbose, StateStore stateStore,
CompactionJobTracker compactionJobTracker, CompactionTaskStatusStore compactionTaskStatusStore,
CompactionJobTracker compactionJobTracker, CompactionTaskTracker compactionTaskTracker,
SqsClient sqsClient, QueueMessageCount.Client messageCount, TablePropertiesProvider tablePropertiesProvider) {
this.instanceProperties = instanceProperties;
this.tableProperties = tableProperties;
this.verbose = verbose;
this.stateStore = stateStore;
this.compactionJobTracker = compactionJobTracker;
this.compactionTaskStatusStore = compactionTaskStatusStore;
this.compactionTaskTracker = compactionTaskTracker;
this.sqsClient = sqsClient;
this.messageCount = messageCount;
this.tablePropertiesProvider = tablePropertiesProvider;
Expand All @@ -92,7 +92,7 @@ private void run() {
JobQuery.Type.UNFINISHED).run();

// Tasks
new CompactionTaskStatusReport(compactionTaskStatusStore,
new CompactionTaskStatusReport(compactionTaskTracker,
new StandardCompactionTaskStatusReporter(System.out),
CompactionTaskQuery.UNFINISHED).run();

Expand Down Expand Up @@ -120,11 +120,11 @@ public static void main(String[] args) {
StateStoreFactory stateStoreFactory = new StateStoreFactory(instanceProperties, s3Client, dynamoDBClient, new Configuration());
StateStore stateStore = stateStoreFactory.getStateStore(tableProperties);
CompactionJobTracker compactionJobTracker = CompactionJobTrackerFactory.getTracker(dynamoDBClient, instanceProperties);
CompactionTaskStatusStore compactionTaskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
CompactionTaskTracker compactionTaskTracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties);

StatusReport statusReport = new StatusReport(
instanceProperties, tableProperties, verbose,
stateStore, compactionJobTracker, compactionTaskStatusStore,
stateStore, compactionJobTracker, compactionTaskTracker,
sqsClient, QueueMessageCount.withSqsClient(sqsClientV1), tablePropertiesProvider);
statusReport.run();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package sleeper.clients.status.report.compaction.task;

import sleeper.compaction.core.task.CompactionTaskStatus;
import sleeper.compaction.core.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;

import java.time.Instant;
import java.util.List;

@FunctionalInterface
public interface CompactionTaskQuery {
CompactionTaskQuery UNFINISHED = CompactionTaskStatusStore::getTasksInProgress;
CompactionTaskQuery ALL = CompactionTaskStatusStore::getAllTasks;
CompactionTaskQuery UNFINISHED = CompactionTaskTracker::getTasksInProgress;
CompactionTaskQuery ALL = CompactionTaskTracker::getAllTasks;

List<CompactionTaskStatus> run(CompactionTaskStatusStore store);
List<CompactionTaskStatus> run(CompactionTaskTracker tracker);

static CompactionTaskQuery from(String type) {
switch (type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package sleeper.clients.status.report.compaction.task;

import sleeper.compaction.core.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;

import java.io.PrintStream;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.google.gson.Gson;

import sleeper.clients.util.ClientsGsonConfig;
import sleeper.compaction.core.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;

import java.io.PrintStream;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import sleeper.clients.util.table.TableField;
import sleeper.clients.util.table.TableRow;
import sleeper.clients.util.table.TableWriterFactory;
import sleeper.compaction.core.task.CompactionTaskStatus;
import sleeper.core.record.process.AverageRecordRate;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;

import java.io.PrintStream;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import sleeper.clients.admin.testutils.RunAdminClient;
import sleeper.compaction.core.job.CompactionJob;
import sleeper.compaction.core.job.CompactionJobTestDataHelper;
import sleeper.compaction.core.task.CompactionTaskStatus;
import sleeper.compaction.core.testutils.InMemoryCompactionTaskStatusStore;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.tracker.compaction.job.InMemoryCompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskTracker;

import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -158,7 +158,7 @@ private RunAdminClient runCompactionJobStatusReport() {
@Nested
@DisplayName("Compaction task status report")
class CompactionTaskStatusReport {
private final InMemoryCompactionTaskStatusStore compactionTaskStatusStore = new InMemoryCompactionTaskStatusStore();
private final InMemoryCompactionTaskTracker compactionTaskTracker = new InMemoryCompactionTaskTracker();

private List<CompactionTaskStatus> exampleTaskStartedStatuses() {
return List.of(startedTask("task-1", "2023-03-15T18:53:12.001Z"));
Expand All @@ -167,7 +167,7 @@ private List<CompactionTaskStatus> exampleTaskStartedStatuses() {
@Test
void shouldRunCompactionTaskStatusReportWithQueryTypeAll() throws Exception {
// Given
exampleTaskStartedStatuses().forEach(compactionTaskStatusStore::taskStarted);
exampleTaskStartedStatuses().forEach(compactionTaskTracker::taskStarted);

// When/Then
String output = runCompactionTaskStatusReport()
Expand All @@ -189,7 +189,7 @@ void shouldRunCompactionTaskStatusReportWithQueryTypeAll() throws Exception {
@Test
void shouldRunCompactionTaskStatusReportWithQueryTypeUnfinished() throws Exception {
// Given
exampleTaskStartedStatuses().forEach(compactionTaskStatusStore::taskStarted);
exampleTaskStartedStatuses().forEach(compactionTaskTracker::taskStarted);

// When/Then
String output = runCompactionTaskStatusReport()
Expand All @@ -210,7 +210,7 @@ private RunAdminClient runCompactionTaskStatusReport() {
InstanceProperties properties = createValidInstanceProperties();
setInstanceProperties(properties);
return runClient().enterPrompts(COMPACTION_STATUS_REPORT_OPTION, COMPACTION_TASK_STATUS_REPORT_OPTION)
.tracker(compactionTaskStatusStore);
.tracker(compactionTaskTracker);
}
}

Expand Down
Loading

0 comments on commit 713ab9b

Please sign in to comment.