diff --git a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java index e0c80cc52c..2f0f163499 100644 --- a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java +++ b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java @@ -123,7 +123,6 @@ import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.validatedIngestStartedStatus; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration; @Testcontainers @@ -704,7 +703,7 @@ private void runJob(BulkImportJobRunner runner, InstanceProperties properties, B } private void runJob(BulkImportJobRunner runner, InstanceProperties properties, BulkImportJob job, Supplier timeSupplier) throws IOException { - statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build()); + statusStore.jobValidated(job.toIngestJob().acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoDBClient); StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDBClient, conf); AddFilesAsynchronously addFilesAsync = BulkImportJobDriver.submitFilesToCommitQueue(sqsClient, s3Client, instanceProperties); diff --git a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java index 59ed78ffed..d18ca7b12f 100644 --- a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java +++ b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java @@ -61,7 +61,6 @@ import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.validatedIngestStartedStatus; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; class BulkImportJobDriverTest { private final InstanceProperties instanceProperties = createTestInstanceProperties(); @@ -227,7 +226,7 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception { private void runJob( BulkImportJob job, String jobRunId, String taskId, Instant validationTime, BulkImportJobDriver driver) throws Exception { - statusStore.jobValidated(ingestJobAccepted(job.toIngestJob(), validationTime).jobRunId(jobRunId).build()); + statusStore.jobValidated(job.toIngestJob().acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); driver.run(job, jobRunId, taskId); } diff --git a/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/executor/BulkImportExecutor.java b/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/executor/BulkImportExecutor.java index e3df0d3c64..c6c7ae646c 100644 --- a/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/executor/BulkImportExecutor.java +++ b/java/bulk-import/bulk-import-starter/src/main/java/sleeper/bulkimport/starter/executor/BulkImportExecutor.java @@ -37,9 +37,6 @@ import java.util.function.Supplier; import java.util.regex.Pattern; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobRejected; - public class BulkImportExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(BulkImportExecutor.class); private static final Predicate LOWER_ALPHANUMERICS_AND_DASHES = Pattern.compile("^[a-z0-9-]+$").asPredicate(); @@ -78,8 +75,8 @@ public void runJob(BulkImportJob bulkImportJob, String jobRunId) { if (!validateJob(bulkImportJob)) { return; } - ingestJobStatusStore.jobValidated(ingestJobAccepted( - bulkImportJob.toIngestJob(), validationTimeSupplier.get()) + ingestJobStatusStore.jobValidated(bulkImportJob.toIngestJob() + .acceptedEventBuilder(validationTimeSupplier.get()) .jobRunId(jobRunId).build()); try { LOGGER.info("Writing job with id {} to JSON file", bulkImportJob.getId()); @@ -123,8 +120,8 @@ private boolean validateJob(BulkImportJob bulkImportJob) { String errorMessage = "The bulk import job failed validation with the following checks failing: \n" + String.join("\n", failedChecks); LOGGER.warn(errorMessage); - ingestJobStatusStore.jobValidated(ingestJobRejected( - bulkImportJob.toIngestJob(), validationTimeSupplier.get(), failedChecks)); + ingestJobStatusStore.jobValidated(bulkImportJob.toIngestJob() + .createRejectedEvent(validationTimeSupplier.get(), failedChecks)); return false; } else { return true; diff --git a/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/IngestJob.java b/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/IngestJob.java index 8fc0c2b3ce..c134df8c0d 100644 --- a/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/IngestJob.java +++ b/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/IngestJob.java @@ -21,6 +21,7 @@ import sleeper.ingest.core.job.status.IngestJobFailedEvent; import sleeper.ingest.core.job.status.IngestJobFinishedEvent; import sleeper.ingest.core.job.status.IngestJobStartedEvent; +import sleeper.ingest.core.job.status.IngestJobValidatedEvent; import java.time.Instant; import java.util.List; @@ -50,13 +51,28 @@ public static Builder builder() { } /** - * Creates a builder for an event when files have been added to the state store. Used with the ingest job tracker. + * Creates a builder for an event when an ingest job passed validation checks. Used with the ingest job tracker. * - * @param writtenTime the time the files were written - * @return the builder + * @param validationTime the validation time + * @return the builder */ - public IngestJobAddedFilesEvent.Builder addedFilesEventBuilder(Instant writtenTime) { - return IngestJobAddedFilesEvent.builder().jobId(id).tableId(tableId).writtenTime(writtenTime); + public IngestJobValidatedEvent.Builder acceptedEventBuilder(Instant validationTime) { + return validatedEventBuilder(validationTime).reasons(List.of()); + } + + /** + * Creates a builder for an event when an ingest job failed validation checks. Used with the ingest job tracker. + * + * @param validationTime the validation time + * @param reasons the reasons why the validation failed + * @return the builder + */ + public IngestJobValidatedEvent createRejectedEvent(Instant validationTime, List reasons) { + return validatedEventBuilder(validationTime).reasons(reasons).build(); + } + + private IngestJobValidatedEvent.Builder validatedEventBuilder(Instant validationTime) { + return IngestJobValidatedEvent.builder().jobId(id).tableId(tableId).validationTime(validationTime).fileCount(getFileCount()); } /** @@ -94,6 +110,16 @@ public IngestJobStartedEvent.Builder startedAfterValidationEventBuilder(Instant return IngestJobStartedEvent.builder().jobId(id).tableId(tableId).startTime(startTime).fileCount(getFileCount()).startOfRun(false); } + /** + * Creates a builder for an event when files have been added to the state store. Used with the ingest job tracker. + * + * @param writtenTime the time the files were written + * @return the builder + */ + public IngestJobAddedFilesEvent.Builder addedFilesEventBuilder(Instant writtenTime) { + return IngestJobAddedFilesEvent.builder().jobId(id).tableId(tableId).writtenTime(writtenTime); + } + /** * Creates a builder for an event when the job finished. Used with the ingest job tracker. * diff --git a/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/status/IngestJobValidatedEvent.java b/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/status/IngestJobValidatedEvent.java index aca4fb98cf..cccb135f3b 100644 --- a/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/status/IngestJobValidatedEvent.java +++ b/java/ingest/ingest-core/src/main/java/sleeper/ingest/core/job/status/IngestJobValidatedEvent.java @@ -16,8 +16,6 @@ package sleeper.ingest.core.job.status; -import sleeper.ingest.core.job.IngestJob; - import java.time.Instant; import java.util.List; import java.util.Objects; @@ -46,17 +44,6 @@ private IngestJobValidatedEvent(Builder builder) { jsonMessage = builder.jsonMessage; } - /** - * Creates an instance of this class for when an ingest job passed validation checks. - * - * @param job the ingest job - * @param validationTime the validation time - * @return an instance of this class - */ - public static Builder ingestJobAccepted(IngestJob job, Instant validationTime) { - return builder().job(job).validationTime(validationTime).reasons(List.of()); - } - /** * Creates an instance of this class for when an ingest job failed validation checks. * @@ -75,18 +62,6 @@ public static IngestJobValidatedEvent ingestJobRejected(String jobId, String jso .build(); } - /** - * Creates an instance of this class for when an ingest job failed validation checks. - * - * @param job the ingest job - * @param validationTime the validation time - * @param reasons the list of reasons why the validation failed - * @return an instance of this class - */ - public static IngestJobValidatedEvent ingestJobRejected(IngestJob job, Instant validationTime, List reasons) { - return builder().job(job).validationTime(validationTime).reasons(reasons).build(); - } - public static Builder builder() { return new Builder(); } @@ -198,18 +173,6 @@ public static final class Builder { private Builder() { } - /** - * Sets the job ID, table ID, and file count from the ingest job. - * - * @param job the ingest job - * @return the builder - */ - public Builder job(IngestJob job) { - return jobId(job.getId()) - .tableId(job.getTableId()) - .fileCount(job.getFileCount()); - } - /** * Sets the job ID. * diff --git a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java index ed0a4ea70b..a4f62a83f2 100644 --- a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java +++ b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java @@ -54,10 +54,8 @@ import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedEvent; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedRun; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestRun; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; public class InMemoryIngestJobStatusStoreTest { @@ -230,7 +228,7 @@ void shouldGetInvalidJobsWithOneRejectedJob() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(rejectedEvent(job, validationTime, "Test validation reason")); + store.jobValidated(job.createRejectedEvent(validationTime, List.of("Test validation reason"))); // Then assertThat(store.getInvalidJobs()) @@ -248,8 +246,8 @@ void shouldGetOneInvalidJobWithOneRejectedJobAndOneAcceptedJob() { Instant validationTime2 = Instant.parse("2022-09-22T12:02:10.000Z"); // When - store.jobValidated(rejectedEvent(job1, validationTime1, "Test validation reason")); - store.jobValidated(ingestJobAccepted(job2, validationTime2).build()); + store.jobValidated(job1.createRejectedEvent(validationTime1, List.of("Test validation reason"))); + store.jobValidated(job2.acceptedEventBuilder(validationTime2).build()); // Then assertThat(store.getInvalidJobs()) @@ -267,8 +265,8 @@ void shouldGetInvalidJobsAcrossMultipleTables() { Instant validationTime2 = Instant.parse("2022-09-22T12:00:31.000Z"); // When - store.jobValidated(rejectedEvent(job1, validationTime1, "Test reason 1")); - store.jobValidated(rejectedEvent(job2, validationTime2, "Test reason 2")); + store.jobValidated(job1.createRejectedEvent(validationTime1, List.of("Test reason 1"))); + store.jobValidated(job2.createRejectedEvent(validationTime2, List.of("Test reason 2"))); // Then assertThat(store.getInvalidJobs()).containsExactly( @@ -285,8 +283,8 @@ void shouldNotGetJobThatWasRejectedThenAccepted() { Instant validationTime2 = Instant.parse("2022-09-22T12:02:10.000Z"); // When - store.jobValidated(rejectedEvent(job1, validationTime1, "Test validation reason")); - store.jobValidated(ingestJobAccepted(job1, validationTime2).build()); + store.jobValidated(job1.createRejectedEvent(validationTime1, List.of("Test validation reason"))); + store.jobValidated(job1.acceptedEventBuilder(validationTime2).build()); // Then assertThat(store.getInvalidJobs()).isEmpty(); @@ -299,7 +297,7 @@ void shouldGetInvalidJobWithNoTable() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(rejectedEvent(job, validationTime, "Test validation reason")); + store.jobValidated(job.createRejectedEvent(validationTime, List.of("Test validation reason"))); // Then assertThat(store.getInvalidJobs()).containsExactly( @@ -318,7 +316,7 @@ void shouldReportUnstartedJobWithNoValidationFailures() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).taskId(taskId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).taskId(taskId).build()); // Then assertThat(store.getAllJobs(tableId)) @@ -334,7 +332,7 @@ void shouldReportStartedJobWithNoValidationFailures() { Instant startTime = Instant.parse("2022-09-22T12:00:15.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).taskId(taskId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).taskId(taskId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).taskId(taskId).build()); // Then @@ -350,7 +348,7 @@ void shouldReportJobWithOneValidationFailure() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(rejectedEvent(job, validationTime, "Test validation reason")); + store.jobValidated(job.createRejectedEvent(validationTime, List.of("Test validation reason"))); // Then assertThat(store.getAllJobs(tableId)) @@ -365,8 +363,8 @@ void shouldReportJobWithMultipleValidationFailures() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(rejectedEvent(job, validationTime, - "Test validation reason 1", "Test validation reason 2")); + store.jobValidated(job.createRejectedEvent(validationTime, + List.of("Test validation reason 1", "Test validation reason 2"))); // Then assertThat(store.getAllJobs(tableId)) @@ -385,7 +383,7 @@ void shouldReportAcceptedJob() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId("test-run").build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId("test-run").build()); // Then assertThat(store.getAllJobs(tableId)) @@ -405,7 +403,7 @@ void shouldReportStartedJob() { Instant startTime = Instant.parse("2022-09-22T12:00:15.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build()); // Then @@ -428,7 +426,7 @@ void shouldReportFinishedJob() { RecordsProcessedSummary summary = summary(startTime, Duration.ofMinutes(10), 100L, 100L); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build()); store.jobFinished(job.finishedEventBuilder(summary).jobRunId(jobRunId).taskId(taskId).numFilesWrittenByJob(2).build()); @@ -474,7 +472,7 @@ void shouldReportFailedJob() { List failureReasons = List.of("Something went wrong"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build()); store.jobFailed(job.failedEventBuilder(runTime).jobRunId(jobRunId).taskId(taskId).failureReasons(failureReasons).build()); diff --git a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java index 49cab752b5..876d2b9c48 100644 --- a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java +++ b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java @@ -499,16 +499,4 @@ public static IngestJobFinishedStatus ingestFinishedStatusUncommitted(IngestJob .build(); } - /** - * Creates an ingest job validated event for a rejected job. - * - * @param job the ingest job - * @param validationTime the validation time - * @param reasons the list of reasons - * @return an ingest job validated event for a rejected job - */ - public static IngestJobValidatedEvent rejectedEvent(IngestJob job, Instant validationTime, String... reasons) { - return IngestJobValidatedEvent.ingestJobRejected(job, validationTime, List.of(reasons)); - } - } diff --git a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryIngestJobStatusByPeriodIT.java b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryIngestJobStatusByPeriodIT.java index b2e77d6dce..75ae202b72 100644 --- a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryIngestJobStatusByPeriodIT.java +++ b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryIngestJobStatusByPeriodIT.java @@ -23,9 +23,9 @@ import java.time.Instant; import java.time.Period; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedEvent; public class QueryIngestJobStatusByPeriodIT extends DynamoDBIngestJobStatusStoreTestBase { @@ -127,7 +127,7 @@ void shouldExcludeRejectedIngestJobFromRangeQueryWhenRejectedTimeIsBeforeStartOf IngestJobStatusStore store = storeWithUpdateTimes(rejectedUpdateTime); // When - store.jobValidated(rejectedEvent(job, rejectedTime, "Test reason")); + store.jobValidated(job.createRejectedEvent(rejectedTime, List.of("Test reason"))); // Then assertThat(store.getJobsInTimePeriod(tableId, periodStart, periodEnd)) diff --git a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryInvalidIngestJobsIT.java b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryInvalidIngestJobsIT.java index 25b63a2a73..a9df13f84c 100644 --- a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryInvalidIngestJobsIT.java +++ b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/QueryInvalidIngestJobsIT.java @@ -23,12 +23,11 @@ import sleeper.ingest.status.store.testutils.DynamoDBIngestJobStatusStoreTestBase; import java.time.Instant; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedEvent; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedRun; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; public class QueryInvalidIngestJobsIT extends DynamoDBIngestJobStatusStoreTestBase { @Test @@ -40,8 +39,8 @@ public void shouldReturnInvalidIngestJobs() { Instant validationTime2 = Instant.parse("2022-12-14T13:52:12.001Z"); // When - store.jobValidated(rejectedEvent(job1, validationTime1, "Test reason 1")); - store.jobValidated(rejectedEvent(job2, validationTime2, "Test reason 2")); + store.jobValidated(job1.createRejectedEvent(validationTime1, List.of("Test reason 1"))); + store.jobValidated(job2.createRejectedEvent(validationTime2, List.of("Test reason 2"))); // Then assertThat(store.getInvalidJobs()) @@ -74,8 +73,8 @@ public void shouldReturnInvalidIngestJobRejectedTwice() { Instant validationTime2 = Instant.parse("2022-12-14T13:52:12.001Z"); // When - store.jobValidated(rejectedEvent(job, validationTime1, "Test reason 1")); - store.jobValidated(rejectedEvent(job, validationTime2, "Test reason 2")); + store.jobValidated(job.createRejectedEvent(validationTime1, List.of("Test reason 1"))); + store.jobValidated(job.createRejectedEvent(validationTime2, List.of("Test reason 2"))); // Then assertThat(store.getInvalidJobs()) @@ -95,8 +94,8 @@ public void shouldExcludeValidIngestJob() { Instant validationTime2 = Instant.parse("2022-12-14T13:52:12.001Z"); // When - store.jobValidated(ingestJobAccepted(job1, validationTime1).build()); - store.jobValidated(rejectedEvent(job2, validationTime2, "Test reason 2")); + store.jobValidated(job1.acceptedEventBuilder(validationTime1).build()); + store.jobValidated(job2.createRejectedEvent(validationTime2, List.of("Test reason 2"))); // Then assertThat(store.getInvalidJobs()) @@ -113,8 +112,8 @@ void shouldExcludeJobThatWasRejectedThenAccepted() { Instant validationTime2 = Instant.parse("2022-12-14T13:52:12.001Z"); // When - store.jobValidated(rejectedEvent(job, validationTime1, "Test reason 1")); - store.jobValidated(ingestJobAccepted(job, validationTime2).build()); + store.jobValidated(job.createRejectedEvent(validationTime1, List.of("Test reason 1"))); + store.jobValidated(job.acceptedEventBuilder(validationTime2).build()); // Then assertThat(store.getInvalidJobs()).isEmpty(); diff --git a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobRunIdIT.java b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobRunIdIT.java index 6ba2b6965b..6c633eae55 100644 --- a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobRunIdIT.java +++ b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobRunIdIT.java @@ -43,7 +43,6 @@ import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRunWhichFinished; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRunWhichStarted; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; public class StoreIngestJobRunIdIT extends DynamoDBIngestJobStatusStoreTestBase { @Test @@ -53,7 +52,7 @@ void shouldReportAcceptedJob() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId("test-run").build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId("test-run").build()); // Then assertThat(getAllJobStatuses()) @@ -71,7 +70,7 @@ void shouldReportStartedJob() { Instant startTime = Instant.parse("2022-09-22T12:00:15.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build()); // Then @@ -123,7 +122,7 @@ void shouldReportFinishedJob() { RecordsProcessedSummary summary = summary(startTime, Duration.ofMinutes(10), 100L, 100L); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build()); store.jobFinished(job.finishedEventBuilder(summary).jobRunId(jobRunId).taskId(taskId).numFilesWrittenByJob(2).build()); @@ -146,7 +145,7 @@ void shouldReportFailedJob() { List failureReasons = List.of("Something failed"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).jobRunId(jobRunId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).jobRunId(jobRunId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).jobRunId(jobRunId).taskId(taskId).build()); store.jobFailed(job.failedEventBuilder(runTime).jobRunId(jobRunId).taskId(taskId).failureReasons(failureReasons).build()); diff --git a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobValidatedIT.java b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobValidatedIT.java index 35f87d389a..b539992f2c 100644 --- a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobValidatedIT.java +++ b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/job/StoreIngestJobValidatedIT.java @@ -29,9 +29,7 @@ import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRunOnTask; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRunWhichStarted; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.jobStatus; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedEvent; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedRun; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; public class StoreIngestJobValidatedIT extends DynamoDBIngestJobStatusStoreTestBase { @Test @@ -42,7 +40,7 @@ void shouldReportUnstartedJobWithNoValidationFailures() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).taskId(taskId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).taskId(taskId).build()); // Then assertThat(getAllJobStatuses()) @@ -59,7 +57,7 @@ void shouldReportStartedJobWithNoValidationFailures() { Instant startTime = Instant.parse("2022-09-22T12:00:15.000Z"); // When - store.jobValidated(ingestJobAccepted(job, validationTime).taskId(taskId).build()); + store.jobValidated(job.acceptedEventBuilder(validationTime).taskId(taskId).build()); store.jobStarted(job.startedAfterValidationEventBuilder(startTime).taskId(taskId).build()); // Then @@ -76,7 +74,7 @@ void shouldReportJobWithOneValidationFailure() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(rejectedEvent(job, validationTime, "Test validation reason")); + store.jobValidated(job.createRejectedEvent(validationTime, List.of("Test validation reason"))); // Then assertThat(getAllJobStatuses()) @@ -92,8 +90,8 @@ void shouldReportJobWithMultipleValidationFailures() { Instant validationTime = Instant.parse("2022-09-22T12:00:10.000Z"); // When - store.jobValidated(rejectedEvent(job, validationTime, - "Test validation reason 1", "Test validation reason 2")); + store.jobValidated(job.createRejectedEvent(validationTime, + List.of("Test validation reason 1", "Test validation reason 2"))); // Then assertThat(getAllJobStatuses()) diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/DirectEmrServerlessDriver.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/DirectEmrServerlessDriver.java index 0366e1889f..a6e9efe35c 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/DirectEmrServerlessDriver.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/ingest/DirectEmrServerlessDriver.java @@ -34,7 +34,6 @@ import java.util.UUID; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_BUCKET; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; public class DirectEmrServerlessDriver implements DirectBulkImportDriver { private final SystemTestInstanceContext instance; @@ -51,7 +50,7 @@ public DirectEmrServerlessDriver(SystemTestInstanceContext instance, SystemTestC public void sendJob(BulkImportJob job) { String jobRunId = UUID.randomUUID().toString(); - jobStatusStore().jobValidated(ingestJobAccepted(job.toIngestJob(), Instant.now()) + jobStatusStore().jobValidated(job.toIngestJob().acceptedEventBuilder(Instant.now()) .jobRunId(jobRunId).build()); s3Client.putObject(instance.getInstanceProperties().get(BULK_IMPORT_BUCKET), "bulk_import/" + job.getId() + "-" + jobRunId + ".json", diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/util/WaitForJobsStatusTest.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/util/WaitForJobsStatusTest.java index 4b4743dd3e..dbc7d83c18 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/util/WaitForJobsStatusTest.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/util/WaitForJobsStatusTest.java @@ -34,7 +34,6 @@ import static sleeper.core.record.process.RecordsProcessedSummaryTestHelper.summary; import static sleeper.core.record.process.status.ProcessStatusUpdateTestHelper.defaultUpdateTime; import static sleeper.ingest.core.job.IngestJobTestData.createJobWithTableAndFiles; -import static sleeper.ingest.core.job.status.IngestJobValidatedEvent.ingestJobAccepted; public class WaitForJobsStatusTest { @@ -48,9 +47,9 @@ void shouldReportSeveralBulkImportJobs() { IngestJob acceptedJob = createJobWithTableAndFiles("accepted-job", table, "test.parquet", "test2.parquet"); IngestJob startedJob = createJobWithTableAndFiles("started-job", table, "test3.parquet", "test4.parquet"); IngestJob finishedJob = createJobWithTableAndFiles("finished-job", table, "test3.parquet", "test4.parquet"); - store.jobValidated(ingestJobAccepted(acceptedJob, Instant.parse("2022-09-22T13:33:10Z")).jobRunId("accepted-run").build()); - store.jobValidated(ingestJobAccepted(startedJob, Instant.parse("2022-09-22T13:33:11Z")).jobRunId("started-run").build()); - store.jobValidated(ingestJobAccepted(finishedJob, Instant.parse("2022-09-22T13:33:12Z")).jobRunId("finished-run").build()); + store.jobValidated(acceptedJob.acceptedEventBuilder(Instant.parse("2022-09-22T13:33:10Z")).jobRunId("accepted-run").build()); + store.jobValidated(startedJob.acceptedEventBuilder(Instant.parse("2022-09-22T13:33:11Z")).jobRunId("started-run").build()); + store.jobValidated(finishedJob.acceptedEventBuilder(Instant.parse("2022-09-22T13:33:12Z")).jobRunId("finished-run").build()); store.jobStarted(startedJob.startedAfterValidationEventBuilder(Instant.parse("2022-09-22T13:33:31Z")).jobRunId("started-run").taskId("started-task").build()); store.jobStarted(finishedJob.startedAfterValidationEventBuilder(Instant.parse("2022-09-22T13:33:32Z")).jobRunId("finished-run").taskId("finished-task").build()); store.jobFinished(finishedJob.finishedEventBuilder(