Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill segments by versions #15994

Merged
merged 21 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The available grammar is:
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"versions" : <optional_list_of_segment_versions_to_delete_in_this_interval>,
"context": <task_context>,
"batchSize": <optional_batch_size>,
"limit": <optional_maximum_number_of_segments_to_delete>,
Expand All @@ -106,6 +107,7 @@ Some of the parameters used in the task payload are further explained below:

| Parameter | Default | Explanation |
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `versions` | null (all versions) | List of segment versions within the specified `interval` for the kill task to delete. The default behavior is to delete all unused segment versions in the specified `interval`.|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|
| `limit` | null (no limit) | Maximum number of segments for the kill task to delete.|
| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used as a cutoff to include unused segments. The kill task only considers segments which lie in the specified `interval` and were marked as unused no later than this time. The default behavior is to kill all unused segments in the `interval` regardless of when they where marked as unused.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@ public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment
{
private final String dataSource;
private final Interval interval;
private final List<String> versions;
private final Integer limit;
private final DateTime maxUsedStatusLastUpdatedTime;

@JsonCreator
public RetrieveUnusedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("versions") @Nullable List<String> versions,
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
this.dataSource = dataSource;
this.interval = interval;
this.versions = versions;
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}
Expand All @@ -64,6 +67,13 @@ public Interval getInterval()
return interval;
}

@Nullable
@JsonProperty
public List<String> getVersions()
{
return versions;
}

@Nullable
@JsonProperty
public Integer getLimit()
Expand All @@ -88,7 +98,7 @@ public TypeReference<List<DataSegment>> getReturnTypeReference()
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUnusedSegmentsForInterval(dataSource, interval, limit, maxUsedStatusLastUpdatedTime);
.retrieveUnusedSegmentsForInterval(dataSource, interval, versions, limit, maxUsedStatusLastUpdatedTime);
}

@Override
Expand All @@ -103,6 +113,7 @@ public String toString()
return getClass().getSimpleName() + "{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", versions=" + versions +
", limit=" + limit +
", maxUsedStatusLastUpdatedTime=" + maxUsedStatusLastUpdatedTime +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -83,6 +84,12 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
*/
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;

/**
* The version of segments to delete in this {@link #getInterval()}.
*/
@Nullable
private final List<String> versions;

@Deprecated
private final boolean markAsUnused;
/**
Expand All @@ -107,6 +114,7 @@ public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("versions") @Nullable List<String> versions,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
Expand All @@ -128,13 +136,27 @@ public KillUnusedSegmentsTask(
if (limit != null && limit <= 0) {
throw InvalidInput.exception("limit[%d] must be a positive integer.", limit);
}
if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
if (Boolean.TRUE.equals(markAsUnused)) {
if (limit != null) {
throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
}
if (!CollectionUtils.isNullOrEmpty(versions)) {
throw InvalidInput.exception("versions[%s] cannot be provided when markAsUnused is enabled.", versions);
}
}
this.versions = versions;
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}


@Nullable
@JsonProperty
public List<String> getVersions()
{
return versions;
}

/**
* This field has been deprecated as "kill" tasks should not be responsible for
* marking segments as unused. Instead, users should call the Coordinator API
Expand Down Expand Up @@ -207,13 +229,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
@Nullable Integer numTotalBatches = getNumTotalBatches();
List<DataSegment> unusedSegments;
LOG.info(
"Starting kill for datasource[%s] in interval[%s] with batchSize[%d], up to limit[%d] segments "
+ "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s",
getDataSource(),
getInterval(),
batchSize,
limit,
maxUsedStatusLastUpdatedTime,
"Starting kill for datasource[%s] in interval[%s] and versions[%s] with batchSize[%d], up to limit[%d]"
+ " segments before maxUsedStatusLastUpdatedTime[%s] will be deleted%s",
getDataSource(), getInterval(), getVersions(), batchSize, limit, maxUsedStatusLastUpdatedTime,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);

Expand All @@ -236,11 +254,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
break;
}

unusedSegments = toolbox
.getTaskActionClient()
.submit(
new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime
));
unusedSegments = toolbox.getTaskActionClient().submit(
new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), getVersions(), nextBatchSize, maxUsedStatusLastUpdatedTime)
);

// Fetch locks each time as a revokal could have occurred in between batches
final NavigableMap<DateTime, List<TaskLock>> taskLockMap
Expand All @@ -263,7 +279,6 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));


// Kill segments from the deep storage only if their load specs are not being used by any used segments
final List<DataSegment> segmentsToBeKilled = unusedSegments
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
public class RetrieveSegmentsActionsTest
{
private static final Interval INTERVAL = Intervals.of("2017-10-01/2017-10-15");
private static final String UNUSED_V0 = "v0";
private static final String UNUSED_V1 = "v1";

@ClassRule
public static TaskActionTestKit actionTestKit = new TaskActionTestKit();
Expand All @@ -56,9 +58,11 @@ public static void setup() throws IOException
actionTestKit.getTaskLockbox().add(task);

expectedUnusedSegments = new HashSet<>();
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-05/2017-10-06"), "1"));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), "1"));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "1"));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-05/2017-10-06"), UNUSED_V0));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), UNUSED_V0));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), UNUSED_V0));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), UNUSED_V1));
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), UNUSED_V1));

actionTestKit.getMetadataStorageCoordinator()
.commitSegments(expectedUnusedSegments);
Expand Down Expand Up @@ -88,7 +92,7 @@ private static DataSegment createSegment(Interval interval, String version)
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("met1", "met2"),
NoneShardSpec.instance(),
Integer.valueOf(version),
9,
1
);
}
Expand All @@ -98,31 +102,37 @@ public void testRetrieveUsedSegmentsAction()
{
final RetrieveUsedSegmentsAction action =
new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL));
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUsedSegments, resultSegments);
final Set<DataSegment> observedUsedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUsedSegments, observedUsedSegments);
}

@Test
public void testRetrieveUnusedSegmentsAction()
public void testRetrieveUnusedSegmentsActionWithVersions()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(
task.getDataSource(),
INTERVAL,
ImmutableList.of(UNUSED_V0, UNUSED_V1),
null,
null
);
final Set<DataSegment> observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.MIN);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(ImmutableSet.of(), resultSegments);
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null, DateTimes.MIN);
final Set<DataSegment> observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(ImmutableSet.of(), observedUnusedSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.nowUtc());
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null, DateTimes.nowUtc());
final Set<DataSegment> observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.jackson.DefaultObjectMapper;
Expand Down Expand Up @@ -52,6 +53,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
false,
99,
5,
Expand All @@ -62,6 +64,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertNull(taskQuery.getVersions());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
Expand All @@ -75,6 +78,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
true,
null,
null,
Expand All @@ -85,6 +89,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertNull(taskQuery.getVersions());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
Expand All @@ -99,6 +104,7 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
null,
true,
99,
null,
Expand All @@ -112,10 +118,11 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getId(), taskQuery.getId());
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertNull(taskQuery.getVersions());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(task.getLimit());
Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime());
Assert.assertNull(taskQuery.getLimit());
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
}

@Test
Expand All @@ -125,6 +132,7 @@ public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegment
null,
"datasource",
Intervals.of("2020-01-01/P1D"),
ImmutableList.of("v1", "v2"),
null,
null,
99,
Expand All @@ -139,6 +147,7 @@ public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegment
Assert.assertEquals(task.getId(), taskQuery.getId());
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.getVersions(), taskQuery.getVersions());
Assert.assertNull(taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
Expand Down
Loading
Loading