Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 4, 2024
1 parent 83d3687 commit c85ed20
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public boolean autoRefresh() {
return Boolean.parseBoolean(getOption(AUTO_REFRESH).orElse("false"));
}

public boolean isExternalScheduler() {
public boolean useSparkScheduler() {
// Default is false, which means using internal scheduler to refresh the index.
return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void runOp(
"Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName());
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
if (flintIndexMetadata.getFlintIndexOptions().useSparkScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void runOp(
LOG.debug(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
if (flintIndexMetadata.getFlintIndexOptions().useSparkScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void runOp(
FlintIndexStateModel flintIndex,
AsyncQueryRequestContext asyncQueryRequestContext) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
if (flintIndexMetadata.getFlintIndexOptions().useSparkScheduler()) {
asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName());
}
flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,53 @@
/** Scheduler interface for scheduling asynchronous query jobs. */
public interface AsyncQueryScheduler {

/** Schedules a new job. */
/**
* Schedules a new job in the system. This method creates a new job entry based on the provided
* request parameters.
*
* <p>Use cases: - Creating a new periodic query execution - Setting up a scheduled data refresh
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/** Updates an existing job with new parameters. */
/**
* Updates an existing job with new parameters. This method modifies the configuration of an
* already scheduled job.
*
* <p>Use cases: - Changing the schedule of an existing job - Modifying query parameters of a
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/** Unschedules a job by marking it as disabled and updating its last update time. */
/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
* used when you want to temporarily stop a job from running but keep its configuration and
* history in the system.
*
* <p>Use cases: - Pausing a job that's causing issues without losing its configuration -
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
*/
void unscheduleJob(String jobId);

/** Removes a job. */
/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
* its associated data from the system.
*
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
*/
void removeJob(String jobId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ public void createDropIndexQueryWithScheduler() {
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).unscheduleJob(indexName);
}

Expand Down Expand Up @@ -274,12 +273,10 @@ public void createVacuumIndexQueryWithScheduler() {
assertNull(response.getSessionId());
verifyGetQueryIdCalled();

// Verifying that the index is deleted
verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).removeJob(indexName);
}

Expand Down Expand Up @@ -342,7 +339,6 @@ public void createAlterIndexQueryWithScheduler() {
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

// Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler
verify(asyncQueryScheduler).unscheduleJob(indexName);

verifyCreateIndexDMLResultCalled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public void setUp() {
asyncQueryScheduler);
}

// Helper method to create FlintIndexMetadata with latest ID
private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() {
return FlintIndexMetadata.builder()
.latestId(LATEST_ID)
Expand All @@ -72,15 +71,13 @@ private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() {
.build();
}

// Helper method to create FlintIndexMetadata without latest ID
private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() {
return FlintIndexMetadata.builder()
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
}

// Helper method to create FlintIndexMetadata with external scheduler
private FlintIndexMetadata createFlintIndexMetadataWithExternalScheduler() {
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.SCHEDULER_MODE, "external");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,14 @@ public void setup() {

@Test
public void testParseValidScheduleString() {
String scheduleStr = "5 minutes";
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 5, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(5, "5 minutes");
}

@Test
public void testParseValidScheduleStringWithDifferentUnits() {
String scheduleStr = "2 hours";
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 120, ChronoUnit.MINUTES), schedule);

scheduleStr = "1 day";
schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1440, ChronoUnit.MINUTES), schedule);

scheduleStr = "3 weeks";
schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 30240, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(120, "2 hours");
verifyParseSchedule(1440, "1 day");
verifyParseSchedule(30240, "3 weeks");
}

@Test
Expand All @@ -61,7 +47,6 @@ public void testParseNullSchedule() {
public void testParseScheduleObject() {
IntervalSchedule expectedSchedule = new IntervalSchedule(startTime, 10, ChronoUnit.MINUTES);
Schedule schedule = IntervalScheduleParser.parse(expectedSchedule, startTime);

assertEquals(expectedSchedule, schedule);
}

Expand All @@ -79,21 +64,15 @@ public void testParseInvalidScheduleString() {

@Test
public void testParseUnsupportedUnits() {
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 year", startTime),
"Expected IllegalArgumentException but no exception was thrown");

assertEquals("Years cannot be converted to minutes accurately.", exception.getMessage());

exception =
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 month", startTime),
"Expected IllegalArgumentException but no exception was thrown");

assertEquals("Months cannot be converted to minutes accurately.", exception.getMessage());
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 year", startTime),
"Expected IllegalArgumentException but no exception was thrown");

assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.parse("1 month", startTime),
"Expected IllegalArgumentException but no exception was thrown");
}

@Test
Expand All @@ -110,44 +89,34 @@ public void testParseNonStringSchedule() {

@Test
public void testParseScheduleWithNanoseconds() {
String scheduleStr = "60000000000 nanoseconds"; // Equivalent to 1 minute
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(1, "60000000000 nanoseconds");
}

@Test
public void testParseScheduleWithMilliseconds() {
String scheduleStr = "60000 milliseconds"; // Equivalent to 1 minute
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(1, "60000 milliseconds");
}

@Test
public void testParseScheduleWithMicroseconds() {
String scheduleStr = "60000000 microseconds"; // Equivalent to 1 minute
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);

assertEquals(new IntervalSchedule(startTime, 1, ChronoUnit.MINUTES), schedule);
verifyParseSchedule(1, "60000000 microseconds");
}

@Test
public void testUnsupportedTimeUnit() {
IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"),
"Expected IllegalArgumentException but no exception was thrown");

assertEquals("Unsupported time unit: unsupportedunit", exception.getMessage());
assertThrows(
IllegalArgumentException.class,
() -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"),
"Expected IllegalArgumentException but no exception was thrown");
}

@Test
public void testParseScheduleWithSeconds() {
String scheduleStr = "120 seconds"; // Equivalent to 2 minutes
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);
verifyParseSchedule(2, "120 seconds");
}

assertEquals(new IntervalSchedule(startTime, 2, ChronoUnit.MINUTES), schedule);
private void verifyParseSchedule(int expectedMinutes, String scheduleStr) {
Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime);
assertEquals(new IntervalSchedule(startTime, expectedMinutes, ChronoUnit.MINUTES), schedule);
}
}

0 comments on commit c85ed20

Please sign in to comment.