diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index fccb14007b94..c444a2507333 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -95,6 +95,7 @@ The available grammar is: "id": , "dataSource": , "interval" : , + "versions" : , "context": , "batchSize": , "limit": , @@ -106,6 +107,7 @@ Some of the parameters used in the task payload are further explained below: | Parameter | Default | Explanation | |-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `versions` | null (all versions) | List of segment versions within the specified `interval` for the kill task to delete. The default behavior is to delete all unused segment versions in the specified `interval`.| | `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.| | `limit` | null (no limit) | Maximum number of segments for the kill task to delete.| | `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used as a cutoff to include unused segments. The kill task only considers segments which lie in the specified `interval` and were marked as unused no later than this time. The default behavior is to kill all unused segments in the `interval` regardless of when they where marked as unused.| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java index 8b474153e181..3f8d4725835a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java @@ -35,6 +35,7 @@ public class RetrieveUnusedSegmentsAction implements TaskAction versions; private final Integer limit; private final DateTime maxUsedStatusLastUpdatedTime; @@ -42,12 +43,14 @@ public class RetrieveUnusedSegmentsAction implements TaskAction versions, @JsonProperty("limit") @Nullable Integer limit, @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime ) { this.dataSource = dataSource; this.interval = interval; + this.versions = versions; this.limit = limit; this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime; } @@ -64,6 +67,13 @@ public Interval getInterval() return interval; } + @Nullable + @JsonProperty + public List getVersions() + { + return versions; + } + @Nullable @JsonProperty public Integer getLimit() @@ -88,7 +98,7 @@ public TypeReference> getReturnTypeReference() public List perform(Task task, TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() - .retrieveUnusedSegmentsForInterval(dataSource, interval, limit, maxUsedStatusLastUpdatedTime); + .retrieveUnusedSegmentsForInterval(dataSource, interval, versions, limit, maxUsedStatusLastUpdatedTime); } @Override @@ -103,6 +113,7 @@ public String toString() return getClass().getSimpleName() + "{" + "dataSource='" + dataSource + '\'' + ", interval=" + interval + + ", versions=" + versions + ", limit=" + limit + ", maxUsedStatusLastUpdatedTime=" + maxUsedStatusLastUpdatedTime + '}'; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java index 957317ad93cd..9104e2cfb05b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null)); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null, null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 9d4ab1edf497..8c61a6b2184a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -83,6 +84,12 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask */ private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; + /** + * The version of segments to delete in this {@link #getInterval()}. + */ + @Nullable + private final List versions; + @Deprecated private final boolean markAsUnused; /** @@ -107,6 +114,7 @@ public KillUnusedSegmentsTask( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, + @JsonProperty("versions") @Nullable List versions, @JsonProperty("context") Map context, @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, @JsonProperty("batchSize") Integer batchSize, @@ -128,13 +136,27 @@ public KillUnusedSegmentsTask( if (limit != null && limit <= 0) { throw InvalidInput.exception("limit[%d] must be a positive integer.", limit); } - if (limit != null && Boolean.TRUE.equals(markAsUnused)) { - throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit); + if (Boolean.TRUE.equals(markAsUnused)) { + if (limit != null) { + throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit); + } + if (!CollectionUtils.isNullOrEmpty(versions)) { + throw InvalidInput.exception("versions[%s] cannot be provided when markAsUnused is enabled.", versions); + } } + this.versions = versions; this.limit = limit; this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime; } + + @Nullable + @JsonProperty + public List getVersions() + { + return versions; + } + /** * This field has been deprecated as "kill" tasks should not be responsible for * marking segments as unused. Instead, users should call the Coordinator API @@ -207,13 +229,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception @Nullable Integer numTotalBatches = getNumTotalBatches(); List unusedSegments; LOG.info( - "Starting kill for datasource[%s] in interval[%s] with batchSize[%d], up to limit[%d] segments " - + "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s", - getDataSource(), - getInterval(), - batchSize, - limit, - maxUsedStatusLastUpdatedTime, + "Starting kill for datasource[%s] in interval[%s] and versions[%s] with batchSize[%d], up to limit[%d]" + + " segments before maxUsedStatusLastUpdatedTime[%s] will be deleted%s", + getDataSource(), getInterval(), getVersions(), batchSize, limit, maxUsedStatusLastUpdatedTime, numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "." ); @@ -236,11 +254,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception break; } - unusedSegments = toolbox - .getTaskActionClient() - .submit( - new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime - )); + unusedSegments = toolbox.getTaskActionClient().submit( + new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), getVersions(), nextBatchSize, maxUsedStatusLastUpdatedTime) + ); // Fetch locks each time as a revokal could have occurred in between batches final NavigableMap> taskLockMap @@ -263,7 +279,6 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); - // Kill segments from the deep storage only if their load specs are not being used by any used segments final List segmentsToBeKilled = unusedSegments .stream() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java index bea3685ca244..e02af66e0465 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null)); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null, null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java index cc635383fed5..e3a0ecf633ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null)); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null, null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 2dee594aad68..e61cf4128b4e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -40,6 +40,8 @@ public class RetrieveSegmentsActionsTest { private static final Interval INTERVAL = Intervals.of("2017-10-01/2017-10-15"); + private static final String UNUSED_V0 = "v0"; + private static final String UNUSED_V1 = "v1"; @ClassRule public static TaskActionTestKit actionTestKit = new TaskActionTestKit(); @@ -56,9 +58,11 @@ public static void setup() throws IOException actionTestKit.getTaskLockbox().add(task); expectedUnusedSegments = new HashSet<>(); - expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-05/2017-10-06"), "1")); - expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), "1")); - expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "1")); + expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-05/2017-10-06"), UNUSED_V0)); + expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), UNUSED_V0)); + expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), UNUSED_V0)); + expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-06/2017-10-07"), UNUSED_V1)); + expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), UNUSED_V1)); actionTestKit.getMetadataStorageCoordinator() .commitSegments(expectedUnusedSegments); @@ -88,7 +92,7 @@ private static DataSegment createSegment(Interval interval, String version) ImmutableList.of("dim1", "dim2"), ImmutableList.of("met1", "met2"), NoneShardSpec.instance(), - Integer.valueOf(version), + 9, 1 ); } @@ -98,31 +102,37 @@ public void testRetrieveUsedSegmentsAction() { final RetrieveUsedSegmentsAction action = new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL)); - final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); - Assert.assertEquals(expectedUsedSegments, resultSegments); + final Set observedUsedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(expectedUsedSegments, observedUsedSegments); } @Test - public void testRetrieveUnusedSegmentsAction() + public void testRetrieveUnusedSegmentsActionWithVersions() { - final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null); - final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); - Assert.assertEquals(expectedUnusedSegments, resultSegments); + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction( + task.getDataSource(), + INTERVAL, + ImmutableList.of(UNUSED_V0, UNUSED_V1), + null, + null + ); + final Set observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments); } @Test public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime() { - final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.MIN); - final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); - Assert.assertEquals(ImmutableSet.of(), resultSegments); + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null, DateTimes.MIN); + final Set observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(ImmutableSet.of(), observedUnusedSegments); } @Test public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime() { - final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.nowUtc()); - final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); - Assert.assertEquals(expectedUnusedSegments, resultSegments); + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null, DateTimes.nowUtc()); + final Set observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java index 7e7c9088d61f..e6dbd0bad13a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.jackson.DefaultObjectMapper; @@ -52,6 +53,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro "killTaskId", "datasource", Intervals.of("2020-01-01/P1D"), + null, false, 99, 5, @@ -62,6 +64,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro Assert.assertEquals(taskQuery.getId(), fromJson.getId()); Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); + Assert.assertNull(taskQuery.getVersions()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize())); Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit()); @@ -75,6 +78,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault "killTaskId", "datasource", Intervals.of("2020-01-01/P1D"), + null, true, null, null, @@ -85,6 +89,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault Assert.assertEquals(taskQuery.getId(), fromJson.getId()); Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); + Assert.assertNull(taskQuery.getVersions()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(100, fromJson.getBatchSize()); Assert.assertNull(taskQuery.getLimit()); @@ -99,6 +104,7 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro "datasource", Intervals.of("2020-01-01/P1D"), null, + null, true, 99, null, @@ -112,10 +118,11 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro Assert.assertEquals(task.getId(), taskQuery.getId()); Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource()); Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); + Assert.assertNull(taskQuery.getVersions()); Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused()); Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); - Assert.assertNull(task.getLimit()); - Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime()); + Assert.assertNull(taskQuery.getLimit()); + Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime()); } @Test @@ -125,6 +132,7 @@ public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegment null, "datasource", Intervals.of("2020-01-01/P1D"), + ImmutableList.of("v1", "v2"), null, null, 99, @@ -139,6 +147,7 @@ public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegment Assert.assertEquals(task.getId(), taskQuery.getId()); Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource()); Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); + Assert.assertEquals(task.getVersions(), taskQuery.getVersions()); Assert.assertNull(taskQuery.getMarkAsUnused()); Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); Assert.assertEquals(task.getLimit(), taskQuery.getLimit()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index d8e7a006605e..2de9a0f10f2a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -92,6 +92,7 @@ public void testKill() throws Exception DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, + null, false, null, null, @@ -101,10 +102,11 @@ public void testKill() throws Exception Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2019/2020"), - null, - null + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null, + null ); Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments); @@ -120,32 +122,83 @@ public void testKill() throws Exception Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats()); } - /** - * {@code segment1}, {@code segment2} and {@code segment3} have different versions, but share the same load spec. - * {@code segment1} and {@code segment2} are unused segments, while {@code segment3} is a used segment. - * When a kill task is submitted, the unused segments {@code segment1} and {@code segment2} should be deleted from the - * metadata store, but should be retained in deep storage as the load spec is used by segment {@code segment3}. - */ @Test - public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception + public void testKillWithMarkUnused() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final Set segments = ImmutableSet.of( + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-02-01/2019-03-01"), version), + newSegment(Intervals.of("2019-03-01/2019-04-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + ); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); + + Assert.assertEquals(segments, announced); + + Assert.assertTrue( + getSegmentsMetadataManager().markSegmentAsUnused( + newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId() + ) + ); + + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2019-03-01/2019-04-01"), + null, + null, + true, + null, + null, + null + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null, + null + ); + + Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments); + Assertions.assertThat( + getMetadataStorageCoordinator() + .retrieveUsedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"), Segments.ONLY_VISIBLE) + ).containsExactlyInAnyOrder( + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + ); + + Assert.assertEquals(new KillTaskReport.Stats(1, 2, 1), getReportedStats()); + } + + @Test + public void testKillSegmentsWithVersions() throws Exception { final DateTime now = DateTimes.nowUtc(); final String v1 = now.toString(); final String v2 = now.minusHours(2).toString(); final String v3 = now.minusHours(3).toString(); - final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1, ImmutableMap.of("foo", "1")); - final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v2, ImmutableMap.of("foo", "1")); - final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v3, ImmutableMap.of("foo", "1")); + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); - final Set segments = ImmutableSet.of(segment1, segment2, segment3); - final Set unusedSegments = ImmutableSet.of(segment1, segment2); + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4, segment5); Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); Assert.assertEquals( - unusedSegments.size(), + segments.size(), getSegmentsMetadataManager().markSegmentsAsUnused( - unusedSegments.stream().map(DataSegment::getId).collect(Collectors.toSet()) + segments.stream().map(DataSegment::getId).collect(Collectors.toSet()) ) ); @@ -153,16 +206,17 @@ public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception null, DATA_SOURCE, Intervals.of("2018/2020"), + ImmutableList.of(v1, v2), null, false, + 3, null, - 100, null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); Assert.assertEquals( - new KillTaskReport.Stats(0, 1, 0), + new KillTaskReport.Stats(4, 3, 0), getReportedStats() ); @@ -173,62 +227,168 @@ public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception null ); - Assert.assertEquals(ImmutableSet.of(), new HashSet<>(observedUnusedSegments)); + Assert.assertEquals(ImmutableSet.of(segment5), new HashSet<>(observedUnusedSegments)); } - @Test - public void testKillWithMarkUnused() throws Exception + public void testKillSegmentsWithVersionsAndLimit() throws Exception { - final String version = DateTimes.nowUtc().toString(); - final Set segments = ImmutableSet.of( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-02-01/2019-03-01"), version), - newSegment(Intervals.of("2019-03-01/2019-04-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.minusHours(2).toString(); + final String v3 = now.minusHours(3).toString(); + + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4, segment5); + + Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); + Assert.assertEquals( + segments.size(), + getSegmentsMetadataManager().markSegmentsAsUnused( + segments.stream().map(DataSegment::getId).collect(Collectors.toSet()) + ) ); - final Set announced = getMetadataStorageCoordinator().commitSegments(segments); - Assert.assertEquals(segments, announced); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018/2020"), + ImmutableList.of(v1), + null, + false, + 3, + 2, + null + ); - Assert.assertTrue( - getSegmentsMetadataManager().markSegmentAsUnused( - newSegment(Intervals.of("2019-02-01/2019-03-01"), version).getId() + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(2, 1, 0), + getReportedStats() + ); + + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); + + Assert.assertEquals(ImmutableSet.of(segment3, segment4, segment5), new HashSet<>(observedUnusedSegments)); + } + + @Test + public void testKillWithNonExistentVersion() throws Exception + { + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.minusHours(2).toString(); + final String v3 = now.minusHours(3).toString(); + + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4, segment5); + + Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); + Assert.assertEquals( + segments.size(), + getSegmentsMetadataManager().markSegmentsAsUnused( + segments.stream().map(DataSegment::getId).collect(Collectors.toSet()) ) ); - final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2019-03-01/2019-04-01"), - null, - true, - null, - null, - null - ); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018/2020"), + ImmutableList.of(now.plusDays(100).toString()), + null, + false, + 3, + 2, + null + ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(0, 1, 0), + getReportedStats() + ); - final List unusedSegments = - getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2019/2020"), - null, - null - ); + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); - Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments); - Assertions.assertThat( - getMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"), Segments.ONLY_VISIBLE) - ).containsExactlyInAnyOrder( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + Assert.assertEquals(segments, new HashSet<>(observedUnusedSegments)); + } + + /** + * {@code segment1}, {@code segment2} and {@code segment3} have different versions, but share the same load spec. + * {@code segment1} and {@code segment2} are unused segments, while {@code segment3} is a used segment. + * When a kill task is submitted, the unused segments {@code segment1} and {@code segment2} should be deleted from the + * metadata store, but should be retained in deep storage as the load spec is used by {@code segment3}. + */ + @Test + public void testKillUnusedSegmentsWithUsedLoadSpec() throws Exception + { + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.minusHours(2).toString(); + final String v3 = now.minusHours(3).toString(); + + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1, ImmutableMap.of("foo", "1")); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v2, ImmutableMap.of("foo", "1")); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v3, ImmutableMap.of("foo", "1")); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3); + final Set unusedSegments = ImmutableSet.of(segment1, segment2); + + Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); + Assert.assertEquals( + unusedSegments.size(), + getSegmentsMetadataManager().markSegmentsAsUnused( + unusedSegments.stream().map(DataSegment::getId).collect(Collectors.toSet()) + ) ); - Assert.assertEquals(new KillTaskReport.Stats(1, 2, 1), getReportedStats()); + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018/2020"), + ImmutableList.of(v1, v2), + null, + false, + null, + 100, + null + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(0, 1, 0), + getReportedStats() + ); + + final List observedUnusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); + + Assert.assertEquals(ImmutableSet.of(), new HashSet<>(observedUnusedSegments)); } @Test @@ -240,6 +400,7 @@ public void testGetInputSourceResources() DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, + null, true, null, null, @@ -276,6 +437,7 @@ public void testKillBatchSizeOneAndLimit4() throws Exception DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 1, 4, @@ -354,6 +516,7 @@ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() DATA_SOURCE, umbrellaInterval, null, + null, false, 1, 10, @@ -446,6 +609,7 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT DATA_SOURCE, umbrellaInterval, null, + null, false, 1, 10, @@ -471,6 +635,7 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT DATA_SOURCE, umbrellaInterval, null, + null, false, 1, 10, @@ -559,6 +724,7 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT DATA_SOURCE, umbrellaInterval, null, + null, false, 1, 10, @@ -569,10 +735,10 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - umbrellaInterval, - null, - null + DATA_SOURCE, + umbrellaInterval, + null, + null ); Assert.assertEquals(ImmutableList.of(segment2, segment3), unusedSegments); @@ -584,6 +750,7 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT DATA_SOURCE, umbrellaInterval, null, + null, false, 1, 10, @@ -593,14 +760,113 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); final List unusedSegments2 = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + umbrellaInterval, + null, + null + ); + + Assert.assertEquals(ImmutableList.of(), unusedSegments2); + Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); + } + + @Test + public void testKillMultipleUnusedSegmentsWithVersionAndDifferentLastUpdatedTime() throws Exception + { + final DateTime version = DateTimes.nowUtc(); + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version.toString()); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version.toString()); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version.toString()); + final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version.minusHours(2).toString()); + final DataSegment segment5 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version.minusHours(3).toString()); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4, segment5); + Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); + + Assert.assertEquals( + 3, + getSegmentsMetadataManager().markSegmentsAsUnused( + ImmutableSet.of(segment1.getId(), segment2.getId(), segment4.getId()) + ) + ); + + // Capture the last updated time cutoff + final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc(); + + // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again + Thread.sleep(1000); + + Assert.assertEquals( + 2, + getSegmentsMetadataManager().markSegmentsAsUnused( + ImmutableSet.of(segment3.getId(), segment5.getId()) + ) + ); + final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc(); + + final List segmentIntervals = segments.stream() + .map(DataSegment::getInterval) + .collect(Collectors.toList()); + + final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); + + final KillUnusedSegmentsTask task1 = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + umbrellaInterval, + ImmutableList.of(version.toString()), + null, + false, + 1, + 10, + maxUsedStatusLastUpdatedTime1 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(2, 3, 0), + getReportedStats() + ); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( DATA_SOURCE, umbrellaInterval, null, null ); - Assert.assertEquals(ImmutableList.of(), unusedSegments2); - Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); + Assert.assertEquals(ImmutableSet.of(segment3, segment4, segment5), new HashSet<>(observedUnusedSegments)); + + final KillUnusedSegmentsTask task2 = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + umbrellaInterval, + ImmutableList.of(version.toString()), + null, + false, + 1, + 10, + maxUsedStatusLastUpdatedTime2 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(1, 2, 0), + getReportedStats() + ); + + final List observedUnusedSegments2 = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + umbrellaInterval, + null, + null + ); + + Assert.assertEquals(ImmutableSet.of(segment4, segment5), new HashSet<>(observedUnusedSegments2)); } @Test @@ -623,6 +889,7 @@ public void testKillBatchSizeThree() throws Exception DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, true, 3, null, @@ -634,11 +901,11 @@ public void testKillBatchSizeThree() throws Exception // we expect ALL tasks to be deleted final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( - DATA_SOURCE, - Intervals.of("2019/2020"), - null, - null - ); + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null + ); Assert.assertEquals(Collections.emptyList(), unusedSegments); @@ -654,6 +921,7 @@ public void testComputeNextBatchSizeDefault() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, null, null, @@ -671,6 +939,7 @@ public void testComputeNextBatchSizeWithBatchSizeLargerThanLimit() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 10, 5, @@ -688,6 +957,7 @@ public void testComputeNextBatchSizeWithBatchSizeSmallerThanLimit() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 5, 10, @@ -705,6 +975,7 @@ public void testComputeNextBatchSizeWithRemainingLessThanLimit() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 5, 10, @@ -722,6 +993,7 @@ public void testGetNumTotalBatchesDefault() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, null, null, @@ -739,6 +1011,7 @@ public void testGetNumTotalBatchesWithBatchSizeLargerThanLimit() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 10, 5, @@ -758,6 +1031,7 @@ public void testInvalidLimit() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 10, 0, @@ -781,6 +1055,7 @@ public void testInvalidBatchSize() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 0, 10, @@ -794,7 +1069,7 @@ public void testInvalidBatchSize() } @Test - public void testInvalidMarkAsUnusedWithLimit() + public void testInvalidLimitWithMarkAsUnused() { MatcherAssert.assertThat( Assert.assertThrows( @@ -804,6 +1079,7 @@ public void testInvalidMarkAsUnusedWithLimit() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, true, 10, 10, @@ -816,6 +1092,30 @@ public void testInvalidMarkAsUnusedWithLimit() ); } + @Test + public void testInvalidVersionWithMarkAsUnused() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + ImmutableList.of("foo"), + null, + true, + 10, + null, + null + ) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "versions[[foo]] cannot be provided when markAsUnused is enabled." + ) + ); + } + @Test public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit() { @@ -825,6 +1125,7 @@ public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit() DATA_SOURCE, Intervals.of("2018-01-01/2020-01-01"), null, + null, false, 5, 10, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6c467bb7ac7c..852ea0df02dc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -924,6 +924,7 @@ public DataSegment apply(String input) "test_kill_task", Intervals.of("2011-04-01/P4D"), null, + null, false, null, null, @@ -1022,6 +1023,7 @@ public DataSegment apply(String input) "test_kill_task", Intervals.of("2011-04-01/P4D"), null, + null, false, null, maxSegmentsToKill, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 19eb1f8e7a2e..41626341bae0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -926,7 +926,7 @@ public void testKillTaskIsAudited() auditManager ); - Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, false, 10, null, null); + Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, null, false, 10, null, null); overlordResource.taskPost(task, req); Assert.assertTrue(auditEntryCapture.hasCaptured()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index f42a300de5f3..9ad3be6e3619 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -111,6 +111,18 @@ public List retrieveUnusedSegmentsForInterval( @Nullable Integer limit, @Nullable DateTime maxUsedStatusLastUpdatedTime ) + { + return retrieveUnusedSegmentsForInterval(dataSource, interval, null, limit, maxUsedStatusLastUpdatedTime); + } + + @Override + public List retrieveUnusedSegmentsForInterval( + String dataSource, + Interval interval, + @Nullable List versions, + @Nullable Integer limit, + @Nullable DateTime maxUsedStatusLastUpdatedTime + ) { synchronized (unusedSegments) { return ImmutableList.copyOf( diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java index e5656ff39752..4dfad3c97c0b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java @@ -26,6 +26,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; /** @@ -40,16 +41,21 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery private final String id; private final String dataSource; private final Interval interval; + @Nullable + private final List versions; private final Boolean markAsUnused; private final Integer batchSize; - @Nullable private final Integer limit; - @Nullable private final DateTime maxUsedStatusLastUpdatedTime; + @Nullable + private final Integer limit; + @Nullable + private final DateTime maxUsedStatusLastUpdatedTime; @JsonCreator public ClientKillUnusedSegmentsTaskQuery( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, + @JsonProperty("versions") @Nullable List versions, @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, @JsonProperty("batchSize") Integer batchSize, @JsonProperty("limit") @Nullable Integer limit, @@ -65,6 +71,7 @@ public ClientKillUnusedSegmentsTaskQuery( this.id = id; this.dataSource = dataSource; this.interval = interval; + this.versions = versions; this.markAsUnused = markAsUnused; this.batchSize = batchSize; this.limit = limit; @@ -98,6 +105,13 @@ public Interval getInterval() return interval; } + @JsonProperty + @Nullable + public List getVersions() + { + return versions; + } + /** * This field has been deprecated as "kill" tasks should not be responsible for * marking segments as unused. Instead, users should call the Coordinator API @@ -146,6 +160,7 @@ public boolean equals(Object o) return Objects.equals(id, that.id) && Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval) + && Objects.equals(versions, that.versions) && Objects.equals(markAsUnused, that.markAsUnused) && Objects.equals(batchSize, that.batchSize) && Objects.equals(limit, that.limit) @@ -155,6 +170,6 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, limit, maxUsedStatusLastUpdatedTime); + return Objects.hash(id, dataSource, interval, versions, markAsUnused, batchSize, limit, maxUsedStatusLastUpdatedTime); } } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 31c975339007..72abff93df7a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -102,9 +102,10 @@ default Collection retrieveUsedSegmentsForInterval( /** * Retrieve all published segments which may include any data in the given intervals and are marked as used from the * metadata store. - * + *

* The order of segments within the returned collection is unspecified, but each segment is guaranteed to appear in * the collection only once. + *

* * @param dataSource The data source to query * @param intervals The intervals for which all applicable and used segments are requested. @@ -135,16 +136,42 @@ Collection retrieveUsedSegmentsForIntervals( * @param interval Filter the data segments to ones that include data in this interval exclusively. * @param limit The maximum number of unused segments to retreive. If null, no limit is applied. * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} - * with {@code used_status_last_updated} no later than this time will be included in the - * kill task. Segments without {@code used_status_last_updated} time (due to an upgrade - * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored + * with {@code used_status_last_updated} no later than this time will be included in the + * kill task. Segments without {@code used_status_last_updated} time (due to an upgrade + * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored + * @return DataSegments which include ONLY data within the requested interval and are marked as unused. Segments NOT + * returned here may include data in the interval + */ + default List retrieveUnusedSegmentsForInterval( + String dataSource, + Interval interval, + @Nullable Integer limit, + @Nullable DateTime maxUsedStatusLastUpdatedTime + ) + { + return retrieveUnusedSegmentsForInterval(dataSource, interval, null, limit, maxUsedStatusLastUpdatedTime); + } + + /** + * Retrieve all published segments which include ONLY data within the given interval and are marked as unused from the + * metadata store. * + * @param dataSource The data source the segments belong to + * @param interval Filter the data segments to ones that include data in this interval exclusively. + * @param versions An optional list of segment versions to retrieve in the given {@code interval}. If unspecified, all + * versions of unused segments in the {@code interval} must be retrieved. + * @param limit The maximum number of unused segments to retreive. If null, no limit is applied. + * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} + * with {@code used_status_last_updated} no later than this time will be included in the + * kill task. Segments without {@code used_status_last_updated} time (due to an upgrade + * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored * @return DataSegments which include ONLY data within the requested interval and are marked as unused. Segments NOT * returned here may include data in the interval */ List retrieveUnusedSegmentsForInterval( String dataSource, Interval interval, + @Nullable List versions, @Nullable Integer limit, @Nullable DateTime maxUsedStatusLastUpdatedTime ); diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 3d8939c3e52e..c886af6a6d93 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -233,6 +233,7 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin public List retrieveUnusedSegmentsForInterval( String dataSource, Interval interval, + @Nullable List versions, @Nullable Integer limit, @Nullable DateTime maxUsedStatusLastUpdatedTime ) @@ -244,6 +245,7 @@ public List retrieveUnusedSegmentsForInterval( .retrieveUnusedSegments( dataSource, Collections.singletonList(interval), + versions, limit, null, null, @@ -255,8 +257,8 @@ public List retrieveUnusedSegmentsForInterval( } ); - log.info("Found [%,d] unused segments for datasource[%s] in interval[%s] with maxUsedStatusLastUpdatedTime[%s].", - matchingSegments.size(), dataSource, interval, maxUsedStatusLastUpdatedTime); + log.info("Found [%,d] unused segments for datasource[%s] in interval[%s] and versions[%s] with maxUsedStatusLastUpdatedTime[%s].", + matchingSegments.size(), dataSource, interval, versions, maxUsedStatusLastUpdatedTime); return matchingSegments; } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 0ddf4a2d2cc1..028f73ee42a7 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -685,7 +685,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null, null)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null, null, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); @@ -993,7 +993,7 @@ public Iterable iterateAllUnusedSegmentsForDatasource( ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval); try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegmentsPlus(datasource, intervals, limit, lastSegmentId, sortOrder, null)) { + queryTool.retrieveUnusedSegmentsPlus(datasource, intervals, null, limit, lastSegmentId, sortOrder, null)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index adbfb0fda237..822138975143 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -35,6 +35,7 @@ import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; @@ -112,7 +113,7 @@ public static SqlSegmentsMetadataQuery forHandle( * * This call does not return any information about realtime segments. * - * Returns a closeable iterator. You should close it when you are done. + * @return a closeable iterator. You should close it when you are done. */ public CloseableIterator retrieveUsedSegments( final String dataSource, @@ -122,6 +123,7 @@ public CloseableIterator retrieveUsedSegments( return retrieveSegments( dataSource, intervals, + null, IntervalMode.OVERLAPS, true, null, @@ -135,11 +137,14 @@ public CloseableIterator retrieveUsedSegments( * Retrieves segments for a given datasource that are marked unused and that are *fully contained by* any interval * in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all * unused segments. - * + *

* This call does not return any information about realtime segments. + *

* * @param dataSource The name of the datasource * @param intervals The intervals to search over + * @param versions An optional list of unused segment versions to retrieve in the given {@code intervals}. + * If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. * @param limit The limit of segments to return * @param lastSegmentId the last segment id from which to search for results. All segments returned are > * this segment lexigraphically if sortOrder is null or ASC, or < this segment @@ -147,15 +152,17 @@ public CloseableIterator retrieveUsedSegments( * @param sortOrder Specifies the order with which to return the matching segments by start time, end time. * A null value indicates that order does not matter. * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code intervals} - * with {@code used_status_last_updated} no later than this time will be included in the - * iterator. Segments without {@code used_status_last_updated} time (due to an upgrade - * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored - - * Returns a closeable iterator. You should close it when you are done. + * with {@code used_status_last_updated} no later than this time will be included in the + * iterator. Segments without {@code used_status_last_updated} time (due to an upgrade + * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored + * + * @return a closeable iterator. You should close it when you are done. + * */ public CloseableIterator retrieveUnusedSegments( final String dataSource, final Collection intervals, + @Nullable final List versions, @Nullable final Integer limit, @Nullable final String lastSegmentId, @Nullable final SortOrder sortOrder, @@ -165,6 +172,7 @@ public CloseableIterator retrieveUnusedSegments( return retrieveSegments( dataSource, intervals, + versions, IntervalMode.CONTAINS, false, limit, @@ -194,11 +202,12 @@ public CloseableIterator retrieveUnusedSegments( * iterator. Segments without {@code used_status_last_updated} time (due to an upgrade * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored - * Returns a closeable iterator. You should close it when you are done. + * @return a closeable iterator. You should close it when you are done. */ public CloseableIterator retrieveUnusedSegmentsPlus( final String dataSource, final Collection intervals, + @Nullable final List versions, @Nullable final Integer limit, @Nullable final String lastSegmentId, @Nullable final SortOrder sortOrder, @@ -208,6 +217,7 @@ public CloseableIterator retrieveUnusedSegmentsPlus( return retrieveSegmentsPlus( dataSource, intervals, + versions, IntervalMode.CONTAINS, false, limit, @@ -224,7 +234,7 @@ public CloseableIterator retrieveUnusedSegmentsPlus( * 1) ensure that the caller passes only used segments to this method when marking them as unused. * 2) Similarly, please try to call this method only on unused segments when marking segments as used with this method. * - * Returns the number of segments actually modified. + * @return the number of segments actually modified. */ public int markSegments(final Collection segmentIds, final boolean used) { @@ -261,7 +271,7 @@ public int markSegments(final Collection segmentIds, final boolean us /** * Marks all used segments that are *fully contained by* a particular interval as unused. * - * Returns the number of segments actually modified. + * @return the number of segments actually modified. */ public int markSegmentsUnused(final String dataSource, final Interval interval) { @@ -305,6 +315,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) retrieveSegments( dataSource, Collections.singletonList(interval), + null, IntervalMode.CONTAINS, true, null, @@ -444,6 +455,7 @@ public static void bindQueryIntervals(final Query> query, fi private CloseableIterator retrieveSegments( final String dataSource, final Collection intervals, + @Nullable final List versions, final IntervalMode matchMode, final boolean used, @Nullable final Integer limit, @@ -454,7 +466,7 @@ private CloseableIterator retrieveSegments( { if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { return CloseableIterators.withEmptyBaggage( - retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) + retrieveSegmentsInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) ); } else { final List> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH); @@ -465,6 +477,7 @@ private CloseableIterator retrieveSegments( final UnmodifiableIterator iterator = retrieveSegmentsInIntervalsBatch( dataSource, intervalList, + versions, matchMode, used, limitPerBatch, @@ -492,6 +505,7 @@ private CloseableIterator retrieveSegments( private CloseableIterator retrieveSegmentsPlus( final String dataSource, final Collection intervals, + @Nullable final List versions, final IntervalMode matchMode, final boolean used, @Nullable final Integer limit, @@ -502,7 +516,7 @@ private CloseableIterator retrieveSegmentsPlus( { if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { return CloseableIterators.withEmptyBaggage( - retrieveSegmentsPlusInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) + retrieveSegmentsPlusInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) ); } else { final List> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH); @@ -513,6 +527,7 @@ private CloseableIterator retrieveSegmentsPlus( final UnmodifiableIterator iterator = retrieveSegmentsPlusInIntervalsBatch( dataSource, intervalList, + versions, matchMode, used, limitPerBatch, @@ -540,6 +555,7 @@ private CloseableIterator retrieveSegmentsPlus( private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( final String dataSource, final Collection intervals, + @Nullable final List versions, final IntervalMode matchMode, final boolean used, @Nullable final Integer limit, @@ -551,6 +567,7 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( final Query> sql = buildSegmentsTableQuery( dataSource, intervals, + versions, matchMode, used, limit, @@ -568,6 +585,7 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( private UnmodifiableIterator retrieveSegmentsPlusInIntervalsBatch( final String dataSource, final Collection intervals, + @Nullable final List versions, final IntervalMode matchMode, final boolean used, @Nullable final Integer limit, @@ -580,6 +598,7 @@ private UnmodifiableIterator retrieveSegmentsPlusInIntervalsBat final Query> sql = buildSegmentsTableQuery( dataSource, intervals, + versions, matchMode, used, limit, @@ -597,6 +616,7 @@ private UnmodifiableIterator retrieveSegmentsPlusInIntervalsBat private Query> buildSegmentsTableQuery( final String dataSource, final Collection intervals, + @Nullable final List versions, final IntervalMode matchMode, final boolean used, @Nullable final Integer limit, @@ -619,6 +639,13 @@ private Query> buildSegmentsTableQuery( appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); } + if (!CollectionUtils.isNullOrEmpty(versions)) { + final String versionsStr = versions.stream() + .map(version -> "'" + version + "'") + .collect(Collectors.joining(",")); + sb.append(StringUtils.format(" AND version IN (%s)", versionsStr)); + } + // Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null. final boolean addMaxUsedLastUpdatedTimeFilter = !used && maxUsedStatusLastUpdatedTime != null; if (addMaxUsedLastUpdatedTimeFilter) { diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 422803492d8e..57fab2fff4a8 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -83,6 +83,8 @@ public interface OverlordClient * @param interval Umbrella interval to be considered by the kill task. Note that unused segments falling in this * widened umbrella interval may have different {@code used_status_last_updated} time, so the kill task * should also filter by {@code maxUsedStatusLastUpdatedTime} + * @param versions An optional list of segment versions to kill in the given {@code interval}. If unspecified, all + * versions of segments in the {@code interval} must be killed. * @param maxSegmentsToKill The maximum number of segments to kill * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} * with {@code used_status_last_updated} no later than this time will be included in the @@ -95,6 +97,7 @@ default ListenableFuture runKillTask( String idPrefix, String dataSource, Interval interval, + @Nullable List versions, @Nullable Integer maxSegmentsToKill, @Nullable DateTime maxUsedStatusLastUpdatedTime ) @@ -104,6 +107,7 @@ default ListenableFuture runKillTask( taskId, dataSource, interval, + versions, false, null, maxSegmentsToKill, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 10d6e077c536..b61ab4878aae 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -218,6 +218,7 @@ private void killUnusedSegments( TASK_ID_PREFIX, dataSource, intervalToKill, + null, maxSegmentsToKill, maxUsedStatusLastUpdatedTime ), diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 1546544029e8..bfc6ce03b07e 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -380,7 +380,7 @@ public Response killUnusedSegmentsInInterval( final Interval theInterval = Intervals.of(interval.replace('_', '/')); try { final String killTaskId = FutureUtils.getUnchecked( - overlordClient.runKillTask("api-issued", dataSourceName, theInterval, null, null), + overlordClient.runKillTask("api-issued", dataSourceName, theInterval, null, null, null), true ); auditManager.doAudit( diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java index 0059d048e063..d683d59ac6e7 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java @@ -46,6 +46,7 @@ public void setUp() "killTaskId", DATA_SOURCE, INTERVAL, + null, true, BATCH_SIZE, LIMIT, diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index da7bf4226921..31035ca316cd 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -1202,6 +1202,25 @@ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() throw Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList()))); } + @Test + public void testRetrieveUnusedSegmentsUsingSingleIntervalVersionAndLimitInRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + + final int requestedLimit = 10; + final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("1900/3000"), + ImmutableList.of("version"), + requestedLimit, + null + ); + + Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); + Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList()))); + } + @Test public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws IOException { @@ -1680,6 +1699,83 @@ public void testSimpleUnusedList() throws IOException ); } + @Test + public void testRetrieveUnusedSegmentsWithVersions() throws IOException + { + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plusDays(2).toString(); + final String v3 = now.plusDays(3).toString(); + final String v4 = now.plusDays(4).toString(); + + final DataSegment segment1 = createSegment( + Intervals.of("2023-01-01/2023-01-02"), + v1, + new LinearShardSpec(0) + ); + final DataSegment segment2 = createSegment( + Intervals.of("2023-01-02/2023-01-03"), + v2, + new LinearShardSpec(0) + ); + final DataSegment segment3 = createSegment( + Intervals.of("2023-01-03/2023-01-04"), + v3, + new LinearShardSpec(0) + ); + final DataSegment segment4 = createSegment( + Intervals.of("2023-01-03/2023-01-04"), + v4, + new LinearShardSpec(0) + ); + + final ImmutableSet unusedSegments = ImmutableSet.of(segment1, segment2, segment3, segment4); + Assert.assertEquals(unusedSegments, coordinator.commitSegments(unusedSegments)); + markAllSegmentsUnused(unusedSegments, DateTimes.nowUtc()); + + for (DataSegment unusedSegment : unusedSegments) { + Assertions.assertThat( + coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("2023-01-01/2023-01-04"), + ImmutableList.of(unusedSegment.getVersion()), + null, + null + ) + ).contains(unusedSegment); + } + + Assertions.assertThat( + coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("2023-01-01/2023-01-04"), + ImmutableList.of(v1, v2), + null, + null + ) + ).contains(segment1, segment2); + + Assertions.assertThat( + coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("2023-01-01/2023-01-04"), + null, + null, + null + ) + ).containsAll(unusedSegments); + + Assertions.assertThat( + coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("2023-01-01/2023-01-04"), + ImmutableList.of("some-non-existent-version"), + null, + null + ) + ).containsAll(ImmutableSet.of()); + } + @Test public void testSimpleUnusedListWithLimit() throws IOException { @@ -1690,6 +1786,7 @@ public void testSimpleUnusedListWithLimit() throws IOException coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), defaultSegment.getInterval(), + null, limit, null ) @@ -3168,6 +3265,7 @@ public void testMarkSegmentsAsUnusedWithinIntervalOneYear() throws IOException existingSegment1.getDataSource(), existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)), null, + null, null ) ) @@ -3496,7 +3594,7 @@ private ImmutableList retrieveUnusedSegments( derbyConnectorRule.metadataTablesConfigSupplier().get(), mapper ) - .retrieveUnusedSegments(DS.WIKI, intervals, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) { + .retrieveUnusedSegments(DS.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) { return ImmutableList.copyOf(iterator); } } @@ -3520,7 +3618,7 @@ private ImmutableList retrieveUnusedSegmentsPlus( derbyConnectorRule.metadataTablesConfigSupplier().get(), mapper ) - .retrieveUnusedSegmentsPlus(DS.WIKI, intervals, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) { + .retrieveUnusedSegmentsPlus(DS.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index cad3f21f06a2..047f94b125ca 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -434,6 +434,7 @@ public void test_taskPayload() throws ExecutionException, InterruptedException, null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 649900c841fb..81bc6c8c490a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -869,6 +869,7 @@ public ListenableFuture runKillTask( String idPrefix, String dataSource, Interval interval, + @Nullable List versions, @Nullable Integer maxSegmentsToKill, @Nullable DateTime maxUsedStatusLastUpdatedTime ) diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 089cd2e1d29c..79184fdf3270 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -591,7 +591,7 @@ public void testKillSegmentsInIntervalInDataSource() Interval theInterval = Intervals.of(interval.replace('_', '/')); OverlordClient overlordClient = EasyMock.createStrictMock(OverlordClient.class); - EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval, null, null)) + EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval, null, null, null)) .andReturn(Futures.immediateFuture("kill_task_1")); EasyMock.replay(overlordClient, server);