Skip to content

Commit

Permalink
Remove IngestJob from IngestJobValidatedEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Dec 18, 2024
1 parent 9821322 commit ab7c552
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.validatedIngestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted;
import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration;

@Testcontainers
Expand Down Expand Up @@ -704,7 +703,7 @@ private void runJob(BulkImportJobRunner runner, InstanceProperties properties, B
}

private void runJob(BulkImportJobRunner runner, InstanceProperties properties, BulkImportJob job, Supplier<Instant> timeSupplier) throws IOException {
statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build());
statusStore.jobValidated(job.toIngestJob().acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoDBClient);
StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDBClient, conf);
AddFilesAsynchronously addFilesAsync = BulkImportJobDriver.submitFilesToCommitQueue(sqsClient, s3Client, instanceProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.validatedIngestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted;

class BulkImportJobDriverTest {
private final InstanceProperties instanceProperties = createTestInstanceProperties();
Expand Down Expand Up @@ -227,7 +226,7 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception {
private void runJob(
BulkImportJob job, String jobRunId, String taskId, Instant validationTime,
BulkImportJobDriver driver) throws Exception {
statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build());
statusStore.jobValidated(job.toIngestJob().acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
driver.run(job, jobRunId, taskId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted;
import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobRejected;

public class BulkImportExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkImportExecutor.class);
private static final Predicate<String> LOWER_ALPHANUMERICS_AND_DASHES = Pattern.compile("^[a-z0-9-]+$").asPredicate();
Expand Down Expand Up @@ -78,8 +75,8 @@ public void runJob(BulkImportJob bulkImportJob, String jobRunId) {
if (!validateJob(bulkImportJob)) {
return;
}
ingestJobStatusStore.jobValidated(ingestJobAccepted(
bulkImportJob.toIngestJob(), validationTimeSupplier.get())
ingestJobStatusStore.jobValidated(bulkImportJob.toIngestJob()
.acceptedEventBuilder(validationTimeSupplier.get())
.jobRunId(jobRunId).build());
try {
LOGGER.info("Writing job with id {} to JSON file", bulkImportJob.getId());
Expand Down Expand Up @@ -123,8 +120,8 @@ private boolean validateJob(BulkImportJob bulkImportJob) {
String errorMessage = "The bulk import job failed validation with the following checks failing: \n"
+ String.join("\n", failedChecks);
LOGGER.warn(errorMessage);
ingestJobStatusStore.jobValidated(ingestJobRejected(
bulkImportJob.toIngestJob(), validationTimeSupplier.get(), failedChecks));
ingestJobStatusStore.jobValidated(bulkImportJob.toIngestJob()
.createRejectedEvent(validationTimeSupplier.get(), failedChecks));
return false;
} else {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sleeper.ingest.core.job.status.IngestJobFailedEvent;
import sleeper.ingest.core.job.status.IngestJobFinishedEvent;
import sleeper.ingest.core.job.status.IngestJobStartedEvent;
import sleeper.ingest.core.job.status.IngestJobValidatedEvent;

import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -50,13 +51,28 @@ public static Builder builder() {
}

/**
* Creates a builder for an event when files have been added to the state store. Used with the ingest job tracker.
* Creates a builder for an event when an ingest job passed validation checks. Used with the ingest job tracker.
*
* @param writtenTime the time the files were written
* @return the builder
* @param validationTime the validation time
* @return the builder
*/
public IngestJobAddedFilesEvent.Builder addedFilesEventBuilder(Instant writtenTime) {
return IngestJobAddedFilesEvent.builder().jobId(id).tableId(tableId).writtenTime(writtenTime);
public IngestJobValidatedEvent.Builder acceptedEventBuilder(Instant validationTime) {
return validatedEventBuilder(validationTime).reasons(List.of());
}

/**
* Creates a builder for an event when an ingest job failed validation checks. Used with the ingest job tracker.
*
* @param validationTime the validation time
* @param reasons the reasons why the validation failed
* @return the builder
*/
public IngestJobValidatedEvent createRejectedEvent(Instant validationTime, List<String> reasons) {
return validatedEventBuilder(validationTime).reasons(reasons).build();
}

private IngestJobValidatedEvent.Builder validatedEventBuilder(Instant validationTime) {
return IngestJobValidatedEvent.builder().jobId(id).tableId(tableId).validationTime(validationTime).fileCount(getFileCount());
}

/**
Expand Down Expand Up @@ -94,6 +110,16 @@ public IngestJobStartedEvent.Builder startedAfterValidationEventBuilder(Instant
return IngestJobStartedEvent.builder().jobId(id).tableId(tableId).startTime(startTime).fileCount(getFileCount()).startOfRun(false);
}

/**
* Creates a builder for an event when files have been added to the state store. Used with the ingest job tracker.
*
* @param writtenTime the time the files were written
* @return the builder
*/
public IngestJobAddedFilesEvent.Builder addedFilesEventBuilder(Instant writtenTime) {
return IngestJobAddedFilesEvent.builder().jobId(id).tableId(tableId).writtenTime(writtenTime);
}

/**
* Creates a builder for an event when the job finished. Used with the ingest job tracker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package sleeper.ingest.core.job.status;

import sleeper.ingest.core.job.IngestJob;

import java.time.Instant;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -46,17 +44,6 @@ private IngestJobValidatedEvent(Builder builder) {
jsonMessage = builder.jsonMessage;
}

/**
* Creates an instance of this class for when an ingest job passed validation checks.
*
* @param job the ingest job
* @param validationTime the validation time
* @return an instance of this class
*/
public static Builder ingestJobAccepted(IngestJob job, Instant validationTime) {
return builder().job(job).validationTime(validationTime).reasons(List.of());
}

/**
* Creates an instance of this class for when an ingest job failed validation checks.
*
Expand All @@ -75,18 +62,6 @@ public static IngestJobValidatedEvent ingestJobRejected(String jobId, String jso
.build();
}

/**
* Creates an instance of this class for when an ingest job failed validation checks.
*
* @param job the ingest job
* @param validationTime the validation time
* @param reasons the list of reasons why the validation failed
* @return an instance of this class
*/
public static IngestJobValidatedEvent ingestJobRejected(IngestJob job, Instant validationTime, List<String> reasons) {
return builder().job(job).validationTime(validationTime).reasons(reasons).build();
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -198,18 +173,6 @@ public static final class Builder {
private Builder() {
}

/**
* Sets the job ID, table ID, and file count from the ingest job.
*
* @param job the ingest job
* @return the builder
*/
public Builder job(IngestJob job) {
return jobId(job.getId())
.tableId(job.getTableId())
.fileCount(job.getFileCount());
}

/**
* Sets the job ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedEvent;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedRun;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestRun;
import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted;

public class InMemoryIngestJobStatusStoreTest {

Expand Down Expand Up @@ -230,7 +228,7 @@ void shouldGetInvalidJobsWithOneRejectedJob() {
Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z");

// When
store.jobValidated(rejectedEvent(job, validationTime, "Test validation reason"));
store.jobValidated(job.createRejectedEvent(validationTime, List.of("Test validation reason")));

// Then
assertThat(store.getInvalidJobs())
Expand All @@ -248,8 +246,8 @@ void shouldGetOneInvalidJobWithOneRejectedJobAndOneAcceptedJob() {
Instant validationTime2 = Instant.parse("2022-09-22T12:02:10.000Z");

// When
store.jobValidated(rejectedEvent(job1, validationTime1, "Test validation reason"));
store.jobValidated(ingestJobAccepted(job2, validationTime2).build());
store.jobValidated(job1.createRejectedEvent(validationTime1, List.of("Test validation reason")));
store.jobValidated(job2.acceptedEventBuilder(validationTime2).build());

// Then
assertThat(store.getInvalidJobs())
Expand All @@ -267,8 +265,8 @@ void shouldGetInvalidJobsAcrossMultipleTables() {
Instant validationTime2 = Instant.parse("2022-09-22T12:00:31.000Z");

// When
store.jobValidated(rejectedEvent(job1, validationTime1, "Test reason 1"));
store.jobValidated(rejectedEvent(job2, validationTime2, "Test reason 2"));
store.jobValidated(job1.createRejectedEvent(validationTime1, List.of("Test reason 1")));
store.jobValidated(job2.createRejectedEvent(validationTime2, List.of("Test reason 2")));

// Then
assertThat(store.getInvalidJobs()).containsExactly(
Expand All @@ -285,8 +283,8 @@ void shouldNotGetJobThatWasRejectedThenAccepted() {
Instant validationTime2 = Instant.parse("2022-09-22T12:02:10.000Z");

// When
store.jobValidated(rejectedEvent(job1, validationTime1, "Test validation reason"));
store.jobValidated(ingestJobAccepted(job1, validationTime2).build());
store.jobValidated(job1.createRejectedEvent(validationTime1, List.of("Test validation reason")));
store.jobValidated(job1.acceptedEventBuilder(validationTime2).build());

// Then
assertThat(store.getInvalidJobs()).isEmpty();
Expand All @@ -299,7 +297,7 @@ void shouldGetInvalidJobWithNoTable() {
Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z");

// When
store.jobValidated(rejectedEvent(job, validationTime, "Test validation reason"));
store.jobValidated(job.createRejectedEvent(validationTime, List.of("Test validation reason")));

// Then
assertThat(store.getInvalidJobs()).containsExactly(
Expand All @@ -318,7 +316,7 @@ void shouldReportUnstartedJobWithNoValidationFailures() {
Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z");

// When
store.jobValidated(ingestJobAccepted(job, validationTime).taskId(taskId).build());
store.jobValidated(job.acceptedEventBuilder(validationTime).taskId(taskId).build());

// Then
assertThat(store.getAllJobs(tableId))
Expand All @@ -334,7 +332,7 @@ void shouldReportStartedJobWithNoValidationFailures() {
Instant startTime = Instant.parse("2022-09-22T12:00:15.000Z");

// When
store.jobValidated(ingestJobAccepted(job, validationTime).taskId(taskId).build());
store.jobValidated(job.acceptedEventBuilder(validationTime).taskId(taskId).build());
store.jobStarted(job.startedAfterValidationEventBuilder(startTime).taskId(taskId).build());

// Then
Expand All @@ -350,7 +348,7 @@ void shouldReportJobWithOneValidationFailure() {
Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z");

// When
store.jobValidated(rejectedEvent(job, validationTime, "Test validation reason"));
store.jobValidated(job.createRejectedEvent(validationTime, List.of("Test validation reason")));

// Then
assertThat(store.getAllJobs(tableId))
Expand All @@ -365,8 +363,8 @@ void shouldReportJobWithMultipleValidationFailures() {
Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z");

// When
store.jobValidated(rejectedEvent(job, validationTime,
"Test validation reason 1", "Test validation reason 2"));
store.jobValidated(job.createRejectedEvent(validationTime,
List.of("Test validation reason 1", "Test validation reason 2")));

// Then
assertThat(store.getAllJobs(tableId))
Expand All @@ -385,7 +383,7 @@ void shouldReportAcceptedJob() {
Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z");

// When
store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId("test-run").build());
store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId("test-run").build());

// Then
assertThat(store.getAllJobs(tableId))
Expand All @@ -405,7 +403,7 @@ void shouldReportStartedJob() {
Instant startTime = Instant.parse("2022-09-22T12:00:15.000Z");

// When
store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build());
store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build());

// Then
Expand All @@ -428,7 +426,7 @@ void shouldReportFinishedJob() {
RecordsProcessedSummary summary = summary(startTime, Duration.ofMinutes(10), 100L, 100L);

// When
store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build());
store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build());
store.jobFinished(job.finishedEventBuilder(summary).jobRunId(jobRunId).taskId(taskId).numFilesWrittenByJob(2).build());

Expand Down Expand Up @@ -474,7 +472,7 @@ void shouldReportFailedJob() {
List<String> failureReasons = List.of("Something went wrong");

// When
store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build());
store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build());
store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build());
store.jobFailed(job.failedEventBuilder(runTime).jobRunId(jobRunId).taskId(taskId).failureReasons(failureReasons).build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,4 @@ public static IngestJobFinishedStatus ingestFinishedStatusUncommitted(IngestJob
.build();
}

/**
* Creates an ingest job validated event for a rejected job.
*
* @param job the ingest job
* @param validationTime the validation time
* @param reasons the list of reasons
* @return an ingest job validated event for a rejected job
*/
public static IngestJobValidatedEvent rejectedEvent(IngestJob job, Instant validationTime, String... reasons) {
return IngestJobValidatedEvent.ingestJobRejected(job, validationTime, List.of(reasons));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

import java.time.Instant;
import java.time.Period;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedEvent;

public class QueryIngestJobStatusByPeriodIT extends DynamoDBIngestJobStatusStoreTestBase {

Expand Down Expand Up @@ -127,7 +127,7 @@ void shouldExcludeRejectedIngestJobFromRangeQueryWhenRejectedTimeIsBeforeStartOf
IngestJobStatusStore store = storeWithUpdateTimes(rejectedUpdateTime);

// When
store.jobValidated(rejectedEvent(job, rejectedTime, "Test reason"));
store.jobValidated(job.createRejectedEvent(rejectedTime, List.of("Test reason")));

// Then
assertThat(store.getJobsInTimePeriod(tableId, periodStart, periodEnd))
Expand Down
Loading

0 comments on commit ab7c552

Please sign in to comment.