From a52bc8b35d57173282f0beff42e7b3c85742ad5c Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 31 May 2023 15:08:26 -0700 Subject: [PATCH] Delete index once it is expired (#326) Signed-off-by: Heemin Kim --- .../UpdateDatasourceTransportAction.java | 48 +++++++++----- .../ip2geo/jobscheduler/Datasource.java | 29 ++++++++- .../ip2geo/jobscheduler/DatasourceRunner.java | 31 ++++++++- .../jobscheduler/DatasourceUpdateService.java | 38 +++++++++-- .../geospatial/ip2geo/Ip2GeoTestCase.java | 23 ++++++- .../UpdateDatasourceTransportActionTests.java | 47 ++++++++++++-- .../jobscheduler/DatasourceRunnerTests.java | 65 ++++++++++++++++++- .../ip2geo/jobscheduler/DatasourceTests.java | 53 ++++++++++++++- .../DatasourceUpdateServiceTests.java | 29 +++++++++ 9 files changed, 326 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index 7b2ed0bf..ffdb349d 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.net.URL; import java.security.InvalidParameterException; +import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Locale; @@ -26,6 +27,7 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.tasks.Task; @@ -97,21 +99,16 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request, }, exception -> listener.onFailure(exception))); } - private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { + private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) { boolean isChanged = false; if (isEndpointChanged(request, datasource)) { datasource.setEndpoint(request.getEndpoint()); isChanged = true; } - - if (isUpdateIntervalChanged(request, datasource)) { - datasource.setUserSchedule( - new IntervalSchedule( - datasource.getUserSchedule().getStartTime(), - (int) request.getUpdateInterval().getDays(), - ChronoUnit.DAYS - ) - ); + if (isUpdateIntervalChanged(request)) { + datasource.setUserSchedule(new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS)); + datasource.setSystemSchedule(datasource.getUserSchedule()); + datasource.setTask(DatasourceTask.ALL); isChanged = true; } @@ -138,6 +135,21 @@ private void updateIfChanged(final UpdateDatasourceRequest request, final Dataso private void validate(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { validateFieldsCompatibility(request, datasource); validateUpdateIntervalIsLessThanValidForInDays(request, datasource); + validateNextUpdateScheduleIsBeforeExpirationDay(request, datasource); + } + + private void validateNextUpdateScheduleIsBeforeExpirationDay(final UpdateDatasourceRequest request, final Datasource datasource) { + if (request.getUpdateInterval() == null) { + return; + } + + IntervalSchedule newSchedule = new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS); + + if (newSchedule.getNextExecutionTime(Instant.now()).isAfter(datasource.expirationDay())) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "datasource will expire at %s with the update interval", datasource.expirationDay().toString()) + ); + } } private void validateFieldsCompatibility(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { @@ -157,7 +169,7 @@ private void validateFieldsCompatibility(final UpdateDatasourceRequest request, private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { - if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request, datasource) == false) { + if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request) == false) { return; } @@ -165,7 +177,7 @@ private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasour ? DatasourceManifest.Builder.build(new URL(request.getEndpoint())).getValidForInDays() : datasource.getDatabase().getValidForInDays(); - long updateInterval = isUpdateIntervalChanged(request, datasource) + long updateInterval = isUpdateIntervalChanged(request) ? request.getUpdateInterval().days() : datasource.getUserSchedule().getInterval(); @@ -180,8 +192,14 @@ private boolean isEndpointChanged(final UpdateDatasourceRequest request, final D return request.getEndpoint() != null && request.getEndpoint().equals(datasource.getEndpoint()) == false; } - private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request, final Datasource datasource) { - return request.getUpdateInterval() != null - && (int) request.getUpdateInterval().days() != datasource.getUserSchedule().getInterval(); + /** + * Update interval is changed as long as user provide one because + * start time will get updated even if the update interval is same as current one. + * + * @param request the update datasource request + * @return true if update interval is changed, and false otherwise + */ + private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request) { + return request.getUpdateInterval() != null; } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index 8e0d2040..5f1e5a4d 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -374,13 +374,38 @@ private String indexNameFor(final long suffix) { /** * Checks if datasource is expired or not * - * @return true if datasource is expired false otherwise + * @return true if datasource is expired, and false otherwise */ public boolean isExpired() { + return willExpire(Instant.now()); + } + + /** + * Checks if datasource will expire at given time + * + * @return true if datasource will expired at given time, and false otherwise + */ + public boolean willExpire(Instant instant) { if (database.validForInDays == null) { return false; } + return instant.isAfter(expirationDay()); + } + + /** + * Day when datasource will expire + * + * @return Day when datasource will expire + */ + public Instant expirationDay() { + if (database.validForInDays == null) { + return Instant.MAX; + } + return lastCheckedAt().plus(database.validForInDays, ChronoUnit.DAYS); + } + + private Instant lastCheckedAt() { Instant lastCheckedAt; if (updateStats.lastSkippedAt == null) { lastCheckedAt = updateStats.lastSucceededAt; @@ -389,7 +414,7 @@ public boolean isExpired() { ? updateStats.lastSkippedAt : updateStats.lastSucceededAt; } - return Instant.now().isAfter(lastCheckedAt.plus(database.validForInDays, ChronoUnit.DAYS)); + return lastCheckedAt; } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index 0ae2953a..8d8938c9 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.concurrent.atomic.AtomicReference; import lombok.extern.log4j.Log4j2; @@ -21,6 +22,7 @@ import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; /** * Datasource update task @@ -29,6 +31,8 @@ */ @Log4j2 public class DatasourceRunner implements ScheduledJobRunner { + private static final int DELETE_INDEX_RETRY_IN_MIN = 15; + private static final int DELETE_INDEX_DELAY_IN_MILLIS = 10000; private static DatasourceRunner INSTANCE; @@ -141,12 +145,37 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter, final try { datasourceUpdateService.deleteUnusedIndices(datasource); - datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock); + if (DatasourceTask.DELETE_UNUSED_INDICES.equals(datasource.getTask()) == false) { + datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock); + } datasourceUpdateService.deleteUnusedIndices(datasource); } catch (Exception e) { log.error("Failed to update datasource for {}", datasource.getName(), e); datasource.getUpdateStats().setLastFailedAt(Instant.now()); datasourceFacade.updateDatasource(datasource); + } finally { + postProcessing(datasource); + } + } + + private void postProcessing(final Datasource datasource) { + if (datasource.isExpired()) { + // Try to delete again as it could have just been expired + datasourceUpdateService.deleteUnusedIndices(datasource); + datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL); + return; + } + + if (datasource.willExpire(datasource.getUserSchedule().getNextExecutionTime(Instant.now()))) { + IntervalSchedule intervalSchedule = new IntervalSchedule( + datasource.expirationDay(), + DELETE_INDEX_RETRY_IN_MIN, + ChronoUnit.MINUTES, + DELETE_INDEX_DELAY_IN_MILLIS + ); + datasourceUpdateService.updateDatasource(datasource, intervalSchedule, DatasourceTask.DELETE_UNUSED_INDICES); + } else { + datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL); } } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 4d9530db..0e6e16d1 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -26,6 +26,7 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; @Log4j2 public class DatasourceUpdateService { @@ -117,23 +118,46 @@ public List getHeaderFields(String manifestUrl) throws IOException { /** * Delete all indices except the one which are being used * - * @param parameter + * @param datasource */ - public void deleteUnusedIndices(final Datasource parameter) { + public void deleteUnusedIndices(final Datasource datasource) { try { - List indicesToDelete = parameter.getIndices() + List indicesToDelete = datasource.getIndices() .stream() - .filter(index -> index.equals(parameter.currentIndexName()) == false) + .filter(index -> index.equals(datasource.currentIndexName()) == false) .collect(Collectors.toList()); List deletedIndices = deleteIndices(indicesToDelete); if (deletedIndices.isEmpty() == false) { - parameter.getIndices().removeAll(deletedIndices); - datasourceFacade.updateDatasource(parameter); + datasource.getIndices().removeAll(deletedIndices); + datasourceFacade.updateDatasource(datasource); } } catch (Exception e) { - log.error("Failed to delete old indices for {}", parameter.getName(), e); + log.error("Failed to delete old indices for {}", datasource.getName(), e); + } + } + + /** + * Update datasource with given systemSchedule and task + * + * @param datasource datasource to update + * @param systemSchedule new system schedule value + * @param task new task value + */ + public void updateDatasource(final Datasource datasource, final IntervalSchedule systemSchedule, final DatasourceTask task) { + boolean updated = false; + if (datasource.getSystemSchedule().equals(systemSchedule) == false) { + datasource.setSystemSchedule(systemSchedule); + updated = true; + } + if (datasource.getTask().equals(task) == false) { + datasource.setTask(task); + updated = true; + } + + if (updated) { + datasourceFacade.updateDatasource(datasource); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index a5129b48..fe3783cc 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -186,12 +186,25 @@ protected long randomPositiveLong() { return value < 0 ? -value : value; } - protected Datasource randomDatasource() { - int validForInDays = Randomness.get().nextInt(30); + /** + * Update interval should be > 0 and < validForInDays. + * For an update test to work, there should be at least one eligible value other than current update interval. + * Therefore, the smallest value for validForInDays is 2. + * Update interval is random value from 1 to validForInDays - 2. + * The new update value will be validForInDays - 1. + */ + protected Datasource randomDatasource(final Instant updateStartTime) { + int validForInDays = 3 + Randomness.get().nextInt(30); Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); Datasource datasource = new Datasource(); datasource.setName(GeospatialTestHelper.randomLowerCaseString()); - datasource.setUserSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS)); + datasource.setUserSchedule( + new IntervalSchedule( + updateStartTime.truncatedTo(ChronoUnit.MILLIS), + 1 + Randomness.get().nextInt(validForInDays - 2), + ChronoUnit.DAYS + ) + ); datasource.setSystemSchedule(datasource.getUserSchedule()); datasource.setTask(randomTask()); datasource.setState(randomState()); @@ -216,6 +229,10 @@ protected Datasource randomDatasource() { return datasource; } + protected Datasource randomDatasource() { + return randomDatasource(Instant.now()); + } + protected LockModel randomLockModel() { LockModel lockModel = new LockModel( GeospatialTestHelper.randomLowerCaseString(), diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java index 21519e98..3df610e8 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -15,6 +15,8 @@ import static org.mockito.Mockito.when; import java.security.InvalidParameterException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import lombok.SneakyThrows; @@ -25,11 +27,11 @@ import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.common.Randomness; import org.opensearch.common.unit.TimeValue; import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; @@ -83,11 +85,12 @@ private void validateDoExecuteWithLockError(final Exception exception) { @SneakyThrows public void testDoExecute_whenValidInput_thenUpdate() { - Datasource datasource = randomDatasource(); + Datasource datasource = randomDatasource(Instant.now().minusSeconds(60)); + datasource.setTask(DatasourceTask.DELETE_UNUSED_INDICES); + Instant originalStartTime = datasource.getSchedule().getStartTime(); UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); request.setEndpoint(sampleManifestUrl()); - // Sample manifest has validForDays of 30. Update interval should be less than that. - request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29))); + request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval())); Task task = mock(Task.class); when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); @@ -113,13 +116,14 @@ public void testDoExecute_whenValidInput_thenUpdate() { assertEquals(request.getUpdateInterval().days(), datasource.getUserSchedule().getInterval()); verify(listener).onResponse(new AcknowledgedResponse(true)); verify(ip2GeoLockService).releaseLock(eq(lockModel)); + assertTrue(originalStartTime.isBefore(datasource.getSchedule().getStartTime())); + assertEquals(DatasourceTask.ALL, datasource.getTask()); } @SneakyThrows public void testDoExecute_whenNoChangesInValues_thenNoUpdate() { Datasource datasource = randomDatasource(); UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); - request.setUpdateInterval(TimeValue.timeValueDays(datasource.getUserSchedule().getInterval())); request.setEndpoint(datasource.getEndpoint()); Task task = mock(Task.class); @@ -204,7 +208,7 @@ public void testDoExecute_whenIncompatibleFields_thenError() { } @SneakyThrows - public void testDoExecute_whenInvalidUpdateInterval_thenError() { + public void testDoExecute_whenLargeUpdateInterval_thenError() { Datasource datasource = randomDatasource(); UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); request.setUpdateInterval(TimeValue.timeValueDays(datasource.getDatabase().getValidForInDays())); @@ -231,4 +235,35 @@ public void testDoExecute_whenInvalidUpdateInterval_thenError() { exceptionCaptor.getValue().getMessage().contains("should be smaller"); verify(ip2GeoLockService).releaseLock(eq(lockModel)); } + + @SneakyThrows + public void testDoExecute_whenExpireWithNewUpdateInterval_thenError() { + Datasource datasource = randomDatasource(); + datasource.getUpdateStats().setLastSkippedAt(null); + datasource.getUpdateStats().setLastSucceededAt(Instant.now().minus(datasource.getDatabase().getValidForInDays(), ChronoUnit.DAYS)); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + request.setUpdateInterval(TimeValue.timeValueDays(1)); + + Task task = mock(Task.class); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + ActionListener listener = mock(ActionListener.class); + LockModel lockModel = randomLockModel(); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(exceptionCaptor.capture()); + assertEquals(IllegalArgumentException.class, exceptionCaptor.getValue().getClass()); + exceptionCaptor.getValue().getMessage().contains("will expire"); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index 004e1dec..d08dc19a 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -17,7 +17,9 @@ import static org.opensearch.geospatial.GeospatialTestHelper.randomLowerCaseString; import java.io.IOException; +import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import lombok.SneakyThrows; @@ -32,6 +34,7 @@ import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; public class DatasourceRunnerTests extends Ip2GeoTestCase { @Before @@ -138,9 +141,63 @@ public void testUpdateDatasource_whenInvalidState_thenUpdateLastFailedAt() { @SneakyThrows public void testUpdateDatasource_whenValidInput_thenSucceed() { - Datasource datasource = new Datasource(); + Datasource datasource = randomDatasource(); + datasource.setState(DatasourceState.AVAILABLE); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + Runnable renewLock = mock(Runnable.class); + + // Run + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource, renewLock); + + // Verify + verify(datasourceUpdateService, times(2)).deleteUnusedIndices(datasource); + verify(datasourceUpdateService).updateOrCreateGeoIpData(datasource, renewLock); + verify(datasourceUpdateService).updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL); + } + + @SneakyThrows + public void testUpdateDatasource_whenDeleteTask_thenDeleteOnly() { + Datasource datasource = randomDatasource(); + datasource.setState(DatasourceState.AVAILABLE); + datasource.setTask(DatasourceTask.DELETE_UNUSED_INDICES); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + Runnable renewLock = mock(Runnable.class); + + // Run + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource, renewLock); + + // Verify + verify(datasourceUpdateService, times(2)).deleteUnusedIndices(datasource); + verify(datasourceUpdateService, never()).updateOrCreateGeoIpData(datasource, renewLock); + verify(datasourceUpdateService).updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL); + } + + @SneakyThrows + public void testUpdateDatasource_whenExpired_thenDeleteIndicesAgain() { + Datasource datasource = randomDatasource(); + datasource.getUpdateStats().setLastSkippedAt(null); + datasource.getUpdateStats() + .setLastSucceededAt(Instant.now().minus(datasource.getDatabase().getValidForInDays() + 1, ChronoUnit.DAYS)); + datasource.setState(DatasourceState.AVAILABLE); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + Runnable renewLock = mock(Runnable.class); + + // Run + DatasourceRunner.getJobRunnerInstance().updateDatasource(datasource, renewLock); + + // Verify + verify(datasourceUpdateService, times(3)).deleteUnusedIndices(datasource); + verify(datasourceUpdateService).updateOrCreateGeoIpData(datasource, renewLock); + verify(datasourceUpdateService).updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL); + } + + @SneakyThrows + public void testUpdateDatasource_whenWillExpire_thenScheduleDeleteTask() { + Datasource datasource = randomDatasource(); + datasource.getUpdateStats().setLastSkippedAt(null); + datasource.getUpdateStats() + .setLastSucceededAt(Instant.now().minus(datasource.getDatabase().getValidForInDays(), ChronoUnit.DAYS).plusSeconds(60)); datasource.setState(DatasourceState.AVAILABLE); - datasource.setName(randomLowerCaseString()); when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); Runnable renewLock = mock(Runnable.class); @@ -150,6 +207,10 @@ public void testUpdateDatasource_whenValidInput_thenSucceed() { // Verify verify(datasourceUpdateService, times(2)).deleteUnusedIndices(datasource); verify(datasourceUpdateService).updateOrCreateGeoIpData(datasource, renewLock); + + ArgumentCaptor captor = ArgumentCaptor.forClass(IntervalSchedule.class); + verify(datasourceUpdateService).updateDatasource(eq(datasource), captor.capture(), eq(DatasourceTask.DELETE_UNUSED_INDICES)); + assertTrue(Duration.between(datasource.expirationDay(), captor.getValue().getNextExecutionTime(Instant.now())).getSeconds() < 30); } @SneakyThrows diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index 49b7961f..23981524 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -92,7 +92,7 @@ public void testGetIndexNameFor() { ); } - public void testIsExpired() { + public void testIsExpired_whenCalled_thenExpectedValue() { Datasource datasource = new Datasource(); // never expire if validForInDays is null assertFalse(datasource.isExpired()); @@ -111,6 +111,57 @@ public void testIsExpired() { assertFalse(datasource.isExpired()); } + public void testWillExpired_whenCalled_thenExpectedValue() { + Datasource datasource = new Datasource(); + // never expire if validForInDays is null + assertFalse(datasource.willExpire(Instant.MAX)); + + long validForInDays = 1; + datasource.getDatabase().setValidForInDays(validForInDays); + + // if last skipped date is null, use only last succeeded date to determine + datasource.getUpdateStats().setLastSucceededAt(Instant.now().minus(1, ChronoUnit.DAYS)); + assertTrue( + datasource.willExpire(datasource.getUpdateStats().getLastSucceededAt().plus(validForInDays, ChronoUnit.DAYS).plusSeconds(1)) + ); + assertFalse(datasource.willExpire(datasource.getUpdateStats().getLastSucceededAt().plus(validForInDays, ChronoUnit.DAYS))); + + // use the latest date between last skipped date and last succeeded date to determine + datasource.getUpdateStats().setLastSkippedAt(Instant.now()); + assertTrue( + datasource.willExpire(datasource.getUpdateStats().getLastSkippedAt().plus(validForInDays, ChronoUnit.DAYS).plusSeconds(1)) + ); + assertFalse(datasource.willExpire(datasource.getUpdateStats().getLastSkippedAt().plus(validForInDays, ChronoUnit.DAYS))); + + datasource.getUpdateStats().setLastSkippedAt(Instant.now().minus(1, ChronoUnit.HOURS)); + datasource.getUpdateStats().setLastSucceededAt(Instant.now()); + assertTrue( + datasource.willExpire(datasource.getUpdateStats().getLastSucceededAt().plus(validForInDays, ChronoUnit.DAYS).plusSeconds(1)) + ); + assertFalse(datasource.willExpire(datasource.getUpdateStats().getLastSucceededAt().plus(validForInDays, ChronoUnit.DAYS))); + } + + public void testExpirationDay_whenCalled_thenExpectedValue() { + Datasource datasource = new Datasource(); + datasource.getDatabase().setValidForInDays(null); + assertEquals(Instant.MAX, datasource.expirationDay()); + + long validForInDays = 1; + datasource.getDatabase().setValidForInDays(validForInDays); + + // if last skipped date is null, use only last succeeded date to determine + datasource.getUpdateStats().setLastSucceededAt(Instant.now().minus(1, ChronoUnit.DAYS)); + assertEquals(datasource.getUpdateStats().getLastSucceededAt().plus(validForInDays, ChronoUnit.DAYS), datasource.expirationDay()); + + // use the latest date between last skipped date and last succeeded date to determine + datasource.getUpdateStats().setLastSkippedAt(Instant.now()); + assertEquals(datasource.getUpdateStats().getLastSkippedAt().plus(validForInDays, ChronoUnit.DAYS), datasource.expirationDay()); + + datasource.getUpdateStats().setLastSkippedAt(Instant.now().minus(1, ChronoUnit.HOURS)); + datasource.getUpdateStats().setLastSucceededAt(Instant.now()); + assertEquals(datasource.getUpdateStats().getLastSucceededAt().plus(validForInDays, ChronoUnit.DAYS), datasource.expirationDay()); + } + public void testLockDurationSeconds() { Datasource datasource = new Datasource(); assertNotNull(datasource.getLockDurationSeconds()); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index 108029fe..d8fbc3e0 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -10,6 +10,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -17,6 +18,7 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -33,6 +35,7 @@ import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; @SuppressForbidden(reason = "unit test") public class DatasourceUpdateServiceTests extends Ip2GeoTestCase { @@ -178,6 +181,32 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { verify(datasourceFacade).updateDatasource(datasource); } + public void testUpdateDatasource_whenNoChange_thenNoUpdate() { + Datasource datasource = randomDatasource(); + + // Run + datasourceUpdateService.updateDatasource(datasource, datasource.getSystemSchedule(), datasource.getTask()); + + // Verify + verify(datasourceFacade, never()).updateDatasource(any()); + } + + public void testUpdateDatasource_whenChange_thenUpdate() { + Datasource datasource = randomDatasource(); + datasource.setTask(DatasourceTask.ALL); + + // Run + datasourceUpdateService.updateDatasource( + datasource, + new IntervalSchedule(Instant.now(), datasource.getSystemSchedule().getInterval() + 1, ChronoUnit.DAYS), + datasource.getTask() + ); + datasourceUpdateService.updateDatasource(datasource, datasource.getSystemSchedule(), DatasourceTask.DELETE_UNUSED_INDICES); + + // Verify + verify(datasourceFacade, times(2)).updateDatasource(any()); + } + @SneakyThrows public void testGetHeaderFields_whenValidInput_thenSucceed() { File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());