diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 2155ac2c2655..bebb52157d6f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; @@ -49,6 +50,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -992,50 +994,76 @@ public Map> getLockedIntervals(List loc } /** - * Gets a List of Intervals locked by higher priority tasks for each datasource. - * Here, Segment Locks are being treated the same as Time Chunk Locks i.e. - * a Task with a Segment Lock is assumed to lock a whole Interval and not just - * the corresponding Segment. - * - * @param minTaskPriority Minimum task priority for each datasource. Only the - * Intervals that are locked by Tasks with equal or - * higher priority than this are returned. Locked intervals - * for datasources that are not present in this Map are - * not returned. - * @return Map from Datasource to List of Intervals locked by Tasks that have - * priority greater than or equal to the {@code minTaskPriority} for that datasource. + * @param lockFilterPolicies Lock filters for the given datasources + * @return Map from datasource to list of non-revoked locks with at least as much priority and an overlapping interval */ - public Map> getLockedIntervals(Map minTaskPriority) + public Map> getActiveLocks(List lockFilterPolicies) { - final Map> datasourceToIntervals = new HashMap<>(); + final Map> datasourceToLocks = new HashMap<>(); // Take a lock and populate the maps giant.lock(); + try { - running.forEach( - (datasource, datasourceLocks) -> { - // If this datasource is not requested, do not proceed - if (!minTaskPriority.containsKey(datasource)) { + lockFilterPolicies.forEach( + lockFilter -> { + final String datasource = lockFilter.getDatasource(); + if (!running.containsKey(datasource)) { return; } - datasourceLocks.forEach( + final int priority = lockFilter.getPriority(); + final List intervals; + if (lockFilter.getIntervals() != null) { + intervals = lockFilter.getIntervals(); + } else { + intervals = Collections.singletonList(Intervals.ETERNITY); + } + + final Map context = lockFilter.getContext(); + final boolean ignoreAppendLocks; + final Boolean useConcurrentLocks = QueryContexts.getAsBoolean( + Tasks.USE_CONCURRENT_LOCKS, + context.get(Tasks.USE_CONCURRENT_LOCKS) + ); + if (useConcurrentLocks == null) { + TaskLockType taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + if (taskLockType == null) { + ignoreAppendLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; + } else { + ignoreAppendLocks = taskLockType == TaskLockType.APPEND; + } + } else { + ignoreAppendLocks = useConcurrentLocks; + } + + running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( (interval, taskLockPosses) -> taskLockPosses.forEach( taskLockPosse -> { if (taskLockPosse.getTaskLock().isRevoked()) { - // Do not proceed if the lock is revoked - return; + // do nothing } else if (taskLockPosse.getTaskLock().getPriority() == null - || taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) { - // Do not proceed if the lock has a priority strictly less than the minimum - return; + || taskLockPosse.getTaskLock().getPriority() < priority) { + // do nothing + } else if (ignoreAppendLocks + && taskLockPosse.getTaskLock().getType() == TaskLockType.APPEND) { + // do nothing + } else { + for (Interval filterInterval : intervals) { + if (interval.overlaps(filterInterval)) { + datasourceToLocks.computeIfAbsent(datasource, ds -> new ArrayList<>()) + .add(taskLockPosse.getTaskLock()); + break; + } + } } - - datasourceToIntervals - .computeIfAbsent(datasource, k -> new HashSet<>()) - .add(interval); - }) + } + ) ) ); } @@ -1045,11 +1073,7 @@ public Map> getLockedIntervals(Map minTa giant.unlock(); } - return datasourceToIntervals.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new ArrayList<>(entry.getValue()) - )); + return datasourceToLocks; } public void unlock(final Task task, final Interval interval) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index f5351d7c6e51..b25bde067c7f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -27,6 +27,7 @@ import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.http.TaskStateLookup; @@ -94,19 +95,12 @@ public Map> getLockedIntervals(List loc } /** - * Gets a List of Intervals locked by higher priority tasks for each datasource. - * - * @param minTaskPriority Minimum task priority for each datasource. Only the - * Intervals that are locked by Tasks with equal or - * higher priority than this are returned. Locked intervals - * for datasources that are not present in this Map are - * not returned. - * @return Map from Datasource to List of Intervals locked by Tasks that have - * priority greater than or equal to the {@code minTaskPriority} for that datasource. + * @param lockFilterPolicies Requests for active locks for various datasources + * @return Map from datasource to conflicting lock infos */ - public Map> getLockedIntervals(Map minTaskPriority) + public Map> getActiveLocks(List lockFilterPolicies) { - return taskLockbox.getLockedIntervals(minTaskPriority); + return taskLockbox.getActiveLocks(lockFilterPolicies); } public List> getActiveTaskInfo(@Nullable String dataSource) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 54ada7cb2b43..b62e1de055f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -241,33 +241,32 @@ public Response isLeader() } } - @Deprecated @POST - @Path("/lockedIntervals") + @Path("/lockedIntervals/v2") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(StateResourceFilter.class) - public Response getDatasourceLockedIntervals(Map minTaskPriority) + public Response getDatasourceLockedIntervals(List lockFilterPolicies) { - if (minTaskPriority == null || minTaskPriority.isEmpty()) { - return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build(); + if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) { + return Response.status(Status.BAD_REQUEST).entity("No filter provided").build(); } // Build the response - return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build(); + return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build(); } @POST - @Path("/lockedIntervals/v2") + @Path("/activeLocks") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(StateResourceFilter.class) - public Response getDatasourceLockedIntervalsV2(List lockFilterPolicies) + public Response getActiveLocks(List lockFilterPolicies) { if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) { return Response.status(Status.BAD_REQUEST).entity("No filter provided").build(); } // Build the response - return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build(); + return Response.ok(new TaskLockResponse(taskQueryTool.getActiveLocks(lockFilterPolicies))).build(); } @GET diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java new file mode 100644 index 000000000000..df4a5d8b03c8 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.TaskLock; + +import java.util.List; +import java.util.Map; + +public class TaskLockResponse +{ + private final Map> datasourceToLocks; + + @JsonCreator + public TaskLockResponse( + @JsonProperty("datasourceToLocks") final Map> datasourceToLocks + ) + { + this.datasourceToLocks = datasourceToLocks; + } + + @JsonProperty + public Map> getDatasourceToLocks() + { + return datasourceToLocks; + } + + @Override + public String toString() + { + return "TaskLockResponse{" + + "datasourceToLocks='" + datasourceToLocks + + '}'; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index a02b51087675..a8c4b5117b1b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -75,7 +75,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -1174,99 +1173,6 @@ public void testGetTimeChunkAndSegmentLockForDifferentGroup() ); } - @Test - public void testGetLockedIntervals() - { - // Acquire locks for task1 - final Task task1 = NoopTask.forDatasource("ds1"); - lockbox.add(task1); - - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task1, - Intervals.of("2017-01-01/2017-02-01") - ); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task1, - Intervals.of("2017-04-01/2017-05-01") - ); - - // Acquire locks for task2 - final Task task2 = NoopTask.forDatasource("ds2"); - lockbox.add(task2); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task2, - Intervals.of("2017-03-01/2017-04-01") - ); - - // Verify the locked intervals - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(task1.getDataSource(), 10); - minTaskPriority.put(task2.getDataSource(), 10); - final Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(2, lockedIntervals.size()); - - Assert.assertEquals( - Arrays.asList( - Intervals.of("2017-01-01/2017-02-01"), - Intervals.of("2017-04-01/2017-05-01") - ), - lockedIntervals.get(task1.getDataSource()) - ); - - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017-03-01/2017-04-01")), - lockedIntervals.get(task2.getDataSource()) - ); - } - - @Test - public void testGetLockedIntervalsForLowPriorityTask() - { - // Acquire lock for a low priority task - final Task lowPriorityTask = NoopTask.ofPriority(5); - lockbox.add(lowPriorityTask); - taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - lowPriorityTask, - Intervals.of("2017/2018") - ); - - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(lowPriorityTask.getDataSource(), 10); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertTrue(lockedIntervals.isEmpty()); - } - - @Test - public void testGetLockedIntervalsForEqualPriorityTask() - { - // Acquire lock for a low priority task - final Task task = NoopTask.ofPriority(5); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task, - Intervals.of("2017/2018") - ); - - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(task.getDataSource(), 5); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList(Intervals.of("2017/2018")), - lockedIntervals.get(task.getDataSource()) - ); - } - @Test public void testGetLockedIntervalsForHigherPriorityExclusiveLock() { @@ -1282,6 +1188,7 @@ public void testGetLockedIntervalsForHigherPriorityExclusiveLock() LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( task.getDataSource(), 75, + null, null ); @@ -1305,6 +1212,7 @@ public void testGetLockedIntervalsForLowerPriorityExclusiveLock() LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( task.getDataSource(), 25, + null, null ); @@ -1332,6 +1240,7 @@ public void testGetLockedIntervalsForLowerPriorityReplaceLock() LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( task.getDataSource(), 25, + null, ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name()) ); @@ -1355,6 +1264,7 @@ public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( task.getDataSource(), 25, + null, ImmutableMap.of( Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name(), @@ -1369,6 +1279,171 @@ public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() } + @Test + public void testGetActiveLocks() + { + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); + expectedLocks.add(overlappingReplaceLock); + + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); + + final TaskLock overlappingAppendLock = + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + expectedLocks.add(overlappingAppendLock); + + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); + + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); + + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + null + ); + + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, + null, + null + ); + + Map> activeLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); + } + + @Test + public void testGetActiveLocksWithAppendLockIgnoresAppendLocks() + { + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); + expectedLocks.add(overlappingReplaceLock); + + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); + + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); + + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); + + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name()) + ); + + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, + null, + null + ); + + Map> activeLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); + } + + @Test + public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks() + { + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); + expectedLocks.add(overlappingReplaceLock); + + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); + + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); + + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); + + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true, Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name()) + ); + + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, + null, + null + ); + + Map> activeLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); + } + + @Test + public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks() + { + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); + + expectedLocks.add(overlappingReplaceLock); + + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); + + final TaskLock overlappingAppendLock = + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + expectedLocks.add(overlappingAppendLock); + + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); + + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); + + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, false, Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name()) + ); + + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, + null, + null + ); + + Map> activeLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); + } + @Test public void testExclusiveLockCompatibility() { @@ -1770,50 +1845,6 @@ public void testTimechunkLockTypeTransitionForSameTaskGroup() validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask, Intervals.of("2024/2025")); } - @Test - public void testGetLockedIntervalsForRevokedLocks() - { - // Acquire lock for a low priority task - final Task lowPriorityTask = NoopTask.ofPriority(5); - lockbox.add(lowPriorityTask); - taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - lowPriorityTask, - Intervals.of("2017/2018") - ); - - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(lowPriorityTask.getDataSource(), 1); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017/2018")), - lockedIntervals.get(lowPriorityTask.getDataSource()) - ); - - // Revoke the lowPriorityTask - final Task highPriorityTask = NoopTask.ofPriority(10); - lockbox.add(highPriorityTask); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - highPriorityTask, - Intervals.of("2017-05-01/2017-06-01") - ); - - // Verify the locked intervals - minTaskPriority.put(highPriorityTask.getDataSource(), 1); - lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017-05-01/2017-06-01")), - lockedIntervals.get(highPriorityTask.getDataSource()) - ); - } - @Test public void testFailedToReacquireTaskLock() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index e6dee0c7e403..93f80ac97096 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -36,6 +36,9 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; @@ -61,6 +64,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; @@ -1057,31 +1061,33 @@ public void testGetTaskStatus() throws Exception @Test public void testGetLockedIntervals() throws Exception { - final Map minTaskPriority = Collections.singletonMap("ds1", 0); - final Map> expectedLockedIntervals = Collections.singletonMap( + final List lockFilterPolicies = ImmutableList.of( + new LockFilterPolicy("ds1", 25, null, null) + ); + final Map> expectedIntervals = Collections.singletonMap( "ds1", Arrays.asList( Intervals.of("2012-01-01/2012-01-02"), - Intervals.of("2012-01-02/2012-01-03") + Intervals.of("2012-01-01/2012-01-02") ) ); - EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority)) - .andReturn(expectedLockedIntervals); + EasyMock.expect(taskLockbox.getLockedIntervals(lockFilterPolicies)) + .andReturn(expectedIntervals); replayAll(); - final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority); + final Response response = overlordResource.getDatasourceLockedIntervals(lockFilterPolicies); Assert.assertEquals(200, response.getStatus()); final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - Map> observedLockedIntervals = jsonMapper.readValue( + Map> observedIntervals = jsonMapper.readValue( jsonMapper.writeValueAsString(response.getEntity()), new TypeReference>>() { } ); - Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals); + Assert.assertEquals(expectedIntervals, observedIntervals); } @Test @@ -1092,7 +1098,65 @@ public void testGetLockedIntervalsWithEmptyBody() Response response = overlordResource.getDatasourceLockedIntervals(null); Assert.assertEquals(400, response.getStatus()); - response = overlordResource.getDatasourceLockedIntervals(Collections.emptyMap()); + response = overlordResource.getDatasourceLockedIntervals(Collections.emptyList()); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testGetActiveLocks() throws Exception + { + final List lockFilterPolicies = ImmutableList.of( + new LockFilterPolicy("ds1", 25, null, null) + ); + final Map> expectedLocks = Collections.singletonMap( + "ds1", + Arrays.asList( + new TimeChunkLock( + TaskLockType.REPLACE, + "groupId", + "datasource", + Intervals.of("2012-01-01/2012-01-02"), + "version", + 25 + ), + new TimeChunkLock( + TaskLockType.EXCLUSIVE, + "groupId", + "datasource", + Intervals.of("2012-01-02/2012-01-03"), + "version", + 75 + ) + ) + ); + + EasyMock.expect(taskLockbox.getActiveLocks(lockFilterPolicies)) + .andReturn(expectedLocks); + replayAll(); + + final Response response = overlordResource.getActiveLocks(lockFilterPolicies); + Assert.assertEquals(200, response.getStatus()); + + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + Map> observedLocks = jsonMapper.readValue( + jsonMapper.writeValueAsString(response.getEntity()), + new TypeReference() + { + } + ).getDatasourceToLocks(); + + Assert.assertEquals(expectedLocks, observedLocks); + } + + @Test + public void testGetActiveLocksWithEmptyBody() + { + replayAll(); + + Response response = overlordResource.getActiveLocks(null); + Assert.assertEquals(400, response.getStatus()); + + response = overlordResource.getActiveLocks(Collections.emptyList()); Assert.assertEquals(400, response.getStatus()); } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java index 65b8dc0b1ac0..84f2dff1d79d 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.utils.ITRetryUtil; @@ -343,12 +344,12 @@ public void testGetLockedIntervals() throws Exception submitIndexTask(INDEX_TASK, datasourceName); // Wait until it acquires a lock - final Map minTaskPriority = Collections.singletonMap(datasourceName, 0); + final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null)); final Map> lockedIntervals = new HashMap<>(); ITRetryUtil.retryUntilFalse( () -> { lockedIntervals.clear(); - lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority)); + lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies)); return lockedIntervals.isEmpty(); }, "Verify Intervals are Locked" diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 8167b9b64e12..f75dc6043f9d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; @@ -334,13 +335,13 @@ public TaskReport.ReportMap getTaskReport(String taskId) } } - public Map> getLockedIntervals(Map minTaskPriority) + public Map> getLockedIntervals(List lockFilterPolicies) { try { - String jsonBody = jsonMapper.writeValueAsString(minTaskPriority); + String jsonBody = jsonMapper.writeValueAsString(lockFilterPolicies); StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals")) + new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals/v2")) .setContent( "application/json", StringUtils.toUtf8(jsonBody) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 8d980d76f12a..79d63cb4550a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -19,11 +19,13 @@ package org.apache.druid.tests.coordinator.duty; +import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -53,7 +55,6 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -265,13 +266,13 @@ private void ensureSegmentsLoaded() */ private void ensureLockedIntervals(Interval... intervals) { - final Map minTaskPriority = Collections.singletonMap(fullDatasourceName, 0); + final LockFilterPolicy lockFilterPolicy = new LockFilterPolicy(fullDatasourceName, 0, null, null); final List lockedIntervals = new ArrayList<>(); ITRetryUtil.retryUntilTrue( () -> { lockedIntervals.clear(); - Map> allIntervals = indexer.getLockedIntervals(minTaskPriority); + Map> allIntervals = indexer.getLockedIntervals(ImmutableList.of(lockFilterPolicy)); if (allIntervals.containsKey(fullDatasourceName)) { lockedIntervals.addAll(allIntervals.get(fullDatasourceName)); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index f527135c80d3..dfe308e2c1b1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; @@ -342,12 +343,12 @@ public void testGetLockedIntervals() throws Exception submitIndexTask(INDEX_TASK, datasourceName); // Wait until it acquires a lock - final Map minTaskPriority = Collections.singletonMap(datasourceName, 0); + final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null)); final Map> lockedIntervals = new HashMap<>(); ITRetryUtil.retryUntilFalse( () -> { lockedIntervals.clear(); - lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority)); + lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies)); return lockedIntervals.isEmpty(); }, "Verify Intervals are Locked" diff --git a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java index 88ab4673aa8a..019fd22807c3 100644 --- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java +++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java @@ -21,10 +21,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.Objects; /** * Specifies a policy to filter active locks held by a datasource @@ -33,17 +35,20 @@ public class LockFilterPolicy { private final String datasource; private final int priority; + private final List intervals; private final Map context; @JsonCreator public LockFilterPolicy( @JsonProperty("datasource") String datasource, @JsonProperty("priority") int priority, - @JsonProperty("context") Map context + @JsonProperty("intervals") @Nullable List intervals, + @JsonProperty("context") @Nullable Map context ) { this.datasource = datasource; this.priority = priority; + this.intervals = intervals; this.context = context == null ? Collections.emptyMap() : context; } @@ -65,24 +70,10 @@ public Map getContext() return context; } - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LockFilterPolicy that = (LockFilterPolicy) o; - return Objects.equals(datasource, that.datasource) - && priority == that.priority - && Objects.equals(context, that.context); - } - - @Override - public int hashCode() + @Nullable + @JsonProperty + public List getIntervals() { - return Objects.hash(datasource, priority, context); + return intervals; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 01f3bc77e9ee..d6e25fc9ac69 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -278,7 +278,8 @@ private Map> getLockedIntervals( { final List lockFilterPolicies = compactionConfigs .stream() - .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), config.getTaskContext())) + .map(config -> + new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null, config.getTaskContext())) .collect(Collectors.toList()); final Map> datasourceToLockedIntervals = new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies), true)); diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 8c3b867e3681..5f5837462667 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -225,7 +225,7 @@ public void test_findLockedIntervals() throws Exception final Map> lockMap = ImmutableMap.of("foo", Collections.singletonList(Intervals.of("2000/2001"))); final List requests = ImmutableList.of( - new LockFilterPolicy("foo", 3, null) + new LockFilterPolicy("foo", 3, null, null) ); serviceClient.expectAndRespond( @@ -246,7 +246,7 @@ public void test_findLockedIntervals() throws Exception public void test_findLockedIntervals_nullReturn() throws Exception { final List requests = ImmutableList.of( - new LockFilterPolicy("foo", 3, null) + new LockFilterPolicy("foo", 3, null, null) ); serviceClient.expectAndRespond(