Skip to content

Commit

Permalink
Delete index once it is expired (opensearch-project#326)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 committed Jul 13, 2023
1 parent 7381317 commit 0dfbe7d
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.net.URL;
import java.security.InvalidParameterException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -97,21 +99,16 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
}, exception -> listener.onFailure(exception)));
}

private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
boolean isChanged = false;
if (isEndpointChanged(request, datasource)) {
datasource.setEndpoint(request.getEndpoint());
isChanged = true;
}

if (isUpdateIntervalChanged(request, datasource)) {
datasource.setUserSchedule(
new IntervalSchedule(
datasource.getUserSchedule().getStartTime(),
(int) request.getUpdateInterval().getDays(),
ChronoUnit.DAYS
)
);
if (isUpdateIntervalChanged(request)) {
datasource.setUserSchedule(new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS));
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(DatasourceTask.ALL);
isChanged = true;
}

Expand All @@ -138,6 +135,21 @@ private void updateIfChanged(final UpdateDatasourceRequest request, final Dataso
private void validate(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
validateFieldsCompatibility(request, datasource);
validateUpdateIntervalIsLessThanValidForInDays(request, datasource);
validateNextUpdateScheduleIsBeforeExpirationDay(request, datasource);
}

private void validateNextUpdateScheduleIsBeforeExpirationDay(final UpdateDatasourceRequest request, final Datasource datasource) {
if (request.getUpdateInterval() == null) {
return;
}

IntervalSchedule newSchedule = new IntervalSchedule(Instant.now(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS);

if (newSchedule.getNextExecutionTime(Instant.now()).isAfter(datasource.expirationDay())) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "datasource will expire at %s with the update interval", datasource.expirationDay().toString())
);
}
}

private void validateFieldsCompatibility(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
Expand All @@ -157,15 +169,15 @@ private void validateFieldsCompatibility(final UpdateDatasourceRequest request,

private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasourceRequest request, final Datasource datasource)
throws IOException {
if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request, datasource) == false) {
if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request) == false) {
return;
}

long validForInDays = isEndpointChanged(request, datasource)
? DatasourceManifest.Builder.build(new URL(request.getEndpoint())).getValidForInDays()
: datasource.getDatabase().getValidForInDays();

long updateInterval = isUpdateIntervalChanged(request, datasource)
long updateInterval = isUpdateIntervalChanged(request)
? request.getUpdateInterval().days()
: datasource.getUserSchedule().getInterval();

Expand All @@ -180,8 +192,14 @@ private boolean isEndpointChanged(final UpdateDatasourceRequest request, final D
return request.getEndpoint() != null && request.getEndpoint().equals(datasource.getEndpoint()) == false;
}

private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
return request.getUpdateInterval() != null
&& (int) request.getUpdateInterval().days() != datasource.getUserSchedule().getInterval();
/**
* Update interval is changed as long as user provide one because
* start time will get updated even if the update interval is same as current one.
*
* @param request the update datasource request
* @return true if update interval is changed, and false otherwise
*/
private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request) {
return request.getUpdateInterval() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,38 @@ private String indexNameFor(final long suffix) {
/**
* Checks if datasource is expired or not
*
* @return true if datasource is expired false otherwise
* @return true if datasource is expired, and false otherwise
*/
public boolean isExpired() {
return willExpire(Instant.now());
}

/**
* Checks if datasource will expire at given time
*
* @return true if datasource will expired at given time, and false otherwise
*/
public boolean willExpire(Instant instant) {
if (database.validForInDays == null) {
return false;
}

return instant.isAfter(expirationDay());
}

/**
* Day when datasource will expire
*
* @return Day when datasource will expire
*/
public Instant expirationDay() {
if (database.validForInDays == null) {
return Instant.MAX;
}
return lastCheckedAt().plus(database.validForInDays, ChronoUnit.DAYS);
}

private Instant lastCheckedAt() {
Instant lastCheckedAt;
if (updateStats.lastSkippedAt == null) {
lastCheckedAt = updateStats.lastSucceededAt;
Expand All @@ -389,7 +414,7 @@ public boolean isExpired() {
? updateStats.lastSkippedAt
: updateStats.lastSucceededAt;
}
return Instant.now().isAfter(lastCheckedAt.plus(database.validForInDays, ChronoUnit.DAYS));
return lastCheckedAt;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.log4j.Log4j2;
Expand All @@ -21,6 +22,7 @@
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

/**
* Datasource update task
Expand All @@ -29,6 +31,8 @@
*/
@Log4j2
public class DatasourceRunner implements ScheduledJobRunner {
private static final int DELETE_INDEX_RETRY_IN_MIN = 15;
private static final int DELETE_INDEX_DELAY_IN_MILLIS = 10000;

private static DatasourceRunner INSTANCE;

Expand Down Expand Up @@ -141,12 +145,37 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter, final

try {
datasourceUpdateService.deleteUnusedIndices(datasource);
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
if (DatasourceTask.DELETE_UNUSED_INDICES.equals(datasource.getTask()) == false) {
datasourceUpdateService.updateOrCreateGeoIpData(datasource, renewLock);
}
datasourceUpdateService.deleteUnusedIndices(datasource);
} catch (Exception e) {
log.error("Failed to update datasource for {}", datasource.getName(), e);
datasource.getUpdateStats().setLastFailedAt(Instant.now());
datasourceFacade.updateDatasource(datasource);
} finally {
postProcessing(datasource);
}
}

private void postProcessing(final Datasource datasource) {
if (datasource.isExpired()) {
// Try to delete again as it could have just been expired
datasourceUpdateService.deleteUnusedIndices(datasource);
datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL);
return;
}

if (datasource.willExpire(datasource.getUserSchedule().getNextExecutionTime(Instant.now()))) {
IntervalSchedule intervalSchedule = new IntervalSchedule(
datasource.expirationDay(),
DELETE_INDEX_RETRY_IN_MIN,
ChronoUnit.MINUTES,
DELETE_INDEX_DELAY_IN_MILLIS
);
datasourceUpdateService.updateDatasource(datasource, intervalSchedule, DatasourceTask.DELETE_UNUSED_INDICES);
} else {
datasourceUpdateService.updateDatasource(datasource, datasource.getUserSchedule(), DatasourceTask.ALL);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;

@Log4j2
public class DatasourceUpdateService {
Expand Down Expand Up @@ -117,23 +118,46 @@ public List<String> getHeaderFields(String manifestUrl) throws IOException {
/**
* Delete all indices except the one which are being used
*
* @param parameter
* @param datasource
*/
public void deleteUnusedIndices(final Datasource parameter) {
public void deleteUnusedIndices(final Datasource datasource) {
try {
List<String> indicesToDelete = parameter.getIndices()
List<String> indicesToDelete = datasource.getIndices()
.stream()
.filter(index -> index.equals(parameter.currentIndexName()) == false)
.filter(index -> index.equals(datasource.currentIndexName()) == false)
.collect(Collectors.toList());

List<String> deletedIndices = deleteIndices(indicesToDelete);

if (deletedIndices.isEmpty() == false) {
parameter.getIndices().removeAll(deletedIndices);
datasourceFacade.updateDatasource(parameter);
datasource.getIndices().removeAll(deletedIndices);
datasourceFacade.updateDatasource(datasource);
}
} catch (Exception e) {
log.error("Failed to delete old indices for {}", parameter.getName(), e);
log.error("Failed to delete old indices for {}", datasource.getName(), e);
}
}

/**
* Update datasource with given systemSchedule and task
*
* @param datasource datasource to update
* @param systemSchedule new system schedule value
* @param task new task value
*/
public void updateDatasource(final Datasource datasource, final IntervalSchedule systemSchedule, final DatasourceTask task) {
boolean updated = false;
if (datasource.getSystemSchedule().equals(systemSchedule) == false) {
datasource.setSystemSchedule(systemSchedule);
updated = true;
}
if (datasource.getTask().equals(task) == false) {
datasource.setTask(task);
updated = true;
}

if (updated) {
datasourceFacade.updateDatasource(datasource);
}
}

Expand Down
23 changes: 20 additions & 3 deletions src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,25 @@ protected long randomPositiveLong() {
return value < 0 ? -value : value;
}

protected Datasource randomDatasource() {
int validForInDays = Randomness.get().nextInt(30);
/**
* Update interval should be > 0 and < validForInDays.
* For an update test to work, there should be at least one eligible value other than current update interval.
* Therefore, the smallest value for validForInDays is 2.
* Update interval is random value from 1 to validForInDays - 2.
* The new update value will be validForInDays - 1.
*/
protected Datasource randomDatasource(final Instant updateStartTime) {
int validForInDays = 3 + Randomness.get().nextInt(30);
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setUserSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setUserSchedule(
new IntervalSchedule(
updateStartTime.truncatedTo(ChronoUnit.MILLIS),
1 + Randomness.get().nextInt(validForInDays - 2),
ChronoUnit.DAYS
)
);
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
Expand All @@ -216,6 +229,10 @@ protected Datasource randomDatasource() {
return datasource;
}

protected Datasource randomDatasource() {
return randomDatasource(Instant.now());
}

protected LockModel randomLockModel() {
LockModel lockModel = new LockModel(
GeospatialTestHelper.randomLowerCaseString(),
Expand Down
Loading

0 comments on commit 0dfbe7d

Please sign in to comment.