From 42bc105ff463e661c6216e9e1afbfa421f57f739 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 25 Jul 2024 10:14:39 +0530 Subject: [PATCH 1/6] Add API to fetch conflicting locks --- .../druid/indexing/common/TaskLock.java | 6 + .../druid/indexing/overlord/TaskLockbox.java | 107 ++------ .../indexing/overlord/TaskQueryTool.java | 25 +- .../overlord/http/OverlordResource.java | 21 +- .../indexing/overlord/TaskLockboxTest.java | 258 ++---------------- .../overlord/http/OverlordResourceTest.java | 42 ++- .../druid/metadata/LockFilterPolicy.java | 23 +- .../apache/druid/metadata/TaskLockInfo.java | 108 ++++++++ .../druid/rpc/indexing/OverlordClient.java | 9 +- .../rpc/indexing/OverlordClientImpl.java | 9 +- .../coordinator/duty/CompactSegments.java | 20 +- .../client/indexing/NoopOverlordClient.java | 3 +- .../rpc/indexing/OverlordClientImplTest.java | 28 +- .../coordinator/duty/CompactSegmentsTest.java | 38 ++- 14 files changed, 289 insertions(+), 408 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java index eb96eb4fed91..e3f5c0a42809 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.LockRequest; +import org.apache.druid.metadata.TaskLockInfo; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -77,4 +78,9 @@ default void assertNotRevoked() .build("Lock of type[%s] for interval[%s] was revoked", getType(), getInterval()); } } + + default TaskLockInfo toTaskLockInfo() + { + return new TaskLockInfo(getGranularity().name(), getType().name(), getNonNullPriority(), getInterval()); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 2155ac2c2655..bb033aea9c08 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -40,8 +40,8 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateResult; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; @@ -49,6 +49,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -925,11 +926,11 @@ private Set getNonRevokedReplaceLocks(List posse /** * @param lockFilterPolicies Lock filters for the given datasources - * @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions + * @return Map from datasource to list of non-revoked lock infos with at least as much priority and an overlapping interval */ - public Map> getLockedIntervals(List lockFilterPolicies) + public Map> getConflictingLockInfos(List lockFilterPolicies) { - final Map> datasourceToIntervals = new HashMap<>(); + final Map> datasourceToLocks = new HashMap<>(); // Take a lock and populate the maps giant.lock(); @@ -943,19 +944,12 @@ public Map> getLockedIntervals(List loc } final int priority = lockFilter.getPriority(); - final boolean isReplaceLock = TaskLockType.REPLACE.name().equals( - lockFilter.getContext().getOrDefault( - Tasks.TASK_LOCK_TYPE, - Tasks.DEFAULT_TASK_LOCK_TYPE - ) - ); - final boolean isUsingConcurrentLocks = Boolean.TRUE.equals( - lockFilter.getContext().getOrDefault( - Tasks.USE_CONCURRENT_LOCKS, - Tasks.DEFAULT_USE_CONCURRENT_LOCKS - ) - ); - final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock; + final List intervals; + if (lockFilter.getIntervals() != null) { + intervals = lockFilter.getIntervals(); + } else { + intervals = Collections.singletonList(Intervals.ETERNITY); + } running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( @@ -963,15 +957,17 @@ public Map> getLockedIntervals(List loc taskLockPosse -> { if (taskLockPosse.getTaskLock().isRevoked()) { // do nothing - } else if (ignoreAppendLocks - && TaskLockType.APPEND.equals(taskLockPosse.getTaskLock().getType())) { - // do nothing } else if (taskLockPosse.getTaskLock().getPriority() == null || taskLockPosse.getTaskLock().getPriority() < priority) { // do nothing } else { - datasourceToIntervals.computeIfAbsent(datasource, k -> new HashSet<>()) - .add(interval); + for (Interval filterInterval : intervals) { + if (interval.overlaps(filterInterval)) { + datasourceToLocks.computeIfAbsent(datasource, ds -> new ArrayList<>()) + .add(taskLockPosse.getTaskLock().toTaskLockInfo()); + break; + } + } } } ) @@ -984,72 +980,7 @@ public Map> getLockedIntervals(List loc giant.unlock(); } - return datasourceToIntervals.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new ArrayList<>(entry.getValue()) - )); - } - - /** - * Gets a List of Intervals locked by higher priority tasks for each datasource. - * Here, Segment Locks are being treated the same as Time Chunk Locks i.e. - * a Task with a Segment Lock is assumed to lock a whole Interval and not just - * the corresponding Segment. - * - * @param minTaskPriority Minimum task priority for each datasource. Only the - * Intervals that are locked by Tasks with equal or - * higher priority than this are returned. Locked intervals - * for datasources that are not present in this Map are - * not returned. - * @return Map from Datasource to List of Intervals locked by Tasks that have - * priority greater than or equal to the {@code minTaskPriority} for that datasource. - */ - public Map> getLockedIntervals(Map minTaskPriority) - { - final Map> datasourceToIntervals = new HashMap<>(); - - // Take a lock and populate the maps - giant.lock(); - try { - running.forEach( - (datasource, datasourceLocks) -> { - // If this datasource is not requested, do not proceed - if (!minTaskPriority.containsKey(datasource)) { - return; - } - - datasourceLocks.forEach( - (startTime, startTimeLocks) -> startTimeLocks.forEach( - (interval, taskLockPosses) -> taskLockPosses.forEach( - taskLockPosse -> { - if (taskLockPosse.getTaskLock().isRevoked()) { - // Do not proceed if the lock is revoked - return; - } else if (taskLockPosse.getTaskLock().getPriority() == null - || taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) { - // Do not proceed if the lock has a priority strictly less than the minimum - return; - } - - datasourceToIntervals - .computeIfAbsent(datasource, k -> new HashSet<>()) - .add(interval); - }) - ) - ); - } - ); - } - finally { - giant.unlock(); - } - - return datasourceToIntervals.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new ArrayList<>(entry.getValue()) - )); + return datasourceToLocks; } public void unlock(final Task task, final Interval interval) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index f5351d7c6e51..e5dab8526a91 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.Duration; @@ -85,28 +86,12 @@ public TaskQueryTool( } /** - * @param lockFilterPolicies Requests for conflicing lock intervals for various datasources - * @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked + * @param lockFilterPolicies Requests for conflicing locks for various datasources + * @return Map from datasource to conflicting lock infos */ - public Map> getLockedIntervals(List lockFilterPolicies) + public Map> getConflictingLockInfos(List lockFilterPolicies) { - return taskLockbox.getLockedIntervals(lockFilterPolicies); - } - - /** - * Gets a List of Intervals locked by higher priority tasks for each datasource. - * - * @param minTaskPriority Minimum task priority for each datasource. Only the - * Intervals that are locked by Tasks with equal or - * higher priority than this are returned. Locked intervals - * for datasources that are not present in this Map are - * not returned. - * @return Map from Datasource to List of Intervals locked by Tasks that have - * priority greater than or equal to the {@code minTaskPriority} for that datasource. - */ - public Map> getLockedIntervals(Map minTaskPriority) - { - return taskLockbox.getLockedIntervals(minTaskPriority); + return taskLockbox.getConflictingLockInfos(lockFilterPolicies); } public List> getActiveTaskInfo(@Nullable String dataSource) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 54ada7cb2b43..fa87b415a782 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -241,33 +241,18 @@ public Response isLeader() } } - @Deprecated - @POST - @Path("/lockedIntervals") - @Produces(MediaType.APPLICATION_JSON) - @ResourceFilters(StateResourceFilter.class) - public Response getDatasourceLockedIntervals(Map minTaskPriority) - { - if (minTaskPriority == null || minTaskPriority.isEmpty()) { - return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build(); - } - - // Build the response - return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build(); - } - @POST - @Path("/lockedIntervals/v2") + @Path("/conflictingLocks") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(StateResourceFilter.class) - public Response getDatasourceLockedIntervalsV2(List lockFilterPolicies) + public Response getConflictingLockInfos(List lockFilterPolicies) { if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) { return Response.status(Status.BAD_REQUEST).entity("No filter provided").build(); } // Build the response - return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build(); + return Response.ok(taskQueryTool.getConflictingLockInfos(lockFilterPolicies)).build(); } @GET diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index a02b51087675..d3ce5b702bd8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.indexer.TaskStatus; @@ -42,7 +41,6 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -55,6 +53,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; @@ -75,7 +74,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -1175,200 +1173,48 @@ public void testGetTimeChunkAndSegmentLockForDifferentGroup() } @Test - public void testGetLockedIntervals() + public void testGetConflictingLockInfos() { - // Acquire locks for task1 - final Task task1 = NoopTask.forDatasource("ds1"); - lockbox.add(task1); - - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task1, - Intervals.of("2017-01-01/2017-02-01") - ); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task1, - Intervals.of("2017-04-01/2017-05-01") - ); + final Set expectedConflicts = new HashSet<>(); + final TaskLockInfo overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingReplaceLock); - // Acquire locks for task2 - final Task task2 = NoopTask.forDatasource("ds2"); - lockbox.add(task2); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task2, - Intervals.of("2017-03-01/2017-04-01") - ); + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); - // Verify the locked intervals - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(task1.getDataSource(), 10); - minTaskPriority.put(task2.getDataSource(), 10); - final Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(2, lockedIntervals.size()); + final TaskLockInfo overlappingAppendLock = + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75) + .toTaskLockInfo(); + expectedConflicts.add(overlappingAppendLock); - Assert.assertEquals( - Arrays.asList( - Intervals.of("2017-01-01/2017-02-01"), - Intervals.of("2017-04-01/2017-05-01") - ), - lockedIntervals.get(task1.getDataSource()) - ); + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017-03-01/2017-04-01")), - lockedIntervals.get(task2.getDataSource()) - ); - } + final TaskLockInfo overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingExclusiveLock); - @Test - public void testGetLockedIntervalsForLowPriorityTask() - { - // Acquire lock for a low priority task - final Task lowPriorityTask = NoopTask.ofPriority(5); - lockbox.add(lowPriorityTask); - taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - lowPriorityTask, - Intervals.of("2017/2018") + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")) ); - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(lowPriorityTask.getDataSource(), 10); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertTrue(lockedIntervals.isEmpty()); - } - - @Test - public void testGetLockedIntervalsForEqualPriorityTask() - { - // Acquire lock for a low priority task - final Task task = NoopTask.ofPriority(5); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - task, - Intervals.of("2017/2018") - ); - - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(task.getDataSource(), 5); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList(Intervals.of("2017/2018")), - lockedIntervals.get(task.getDataSource()) - ); - } - - @Test - public void testGetLockedIntervalsForHigherPriorityExclusiveLock() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 75, + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, null ); - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); - Assert.assertTrue(conflictingIntervals.isEmpty()); + Map> conflictingLocks = + lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, conflictingLocks.size()); + Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); } - @Test - public void testGetLockedIntervalsForLowerPriorityExclusiveLock() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 25, - null - ); - - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); - Assert.assertEquals(1, conflictingIntervals.size()); - Assert.assertEquals( - Collections.singletonList(Intervals.of("2017/2018")), - conflictingIntervals.get(task.getDataSource()) - ); - } - - @Test - public void testGetLockedIntervalsForLowerPriorityReplaceLock() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 25, - ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name()) - ); - - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); - Assert.assertTrue(conflictingIntervals.isEmpty()); - } - - @Test - public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() - { - final Task task = NoopTask.ofPriority(50); - lockbox.add(task); - taskStorage.insert(task, TaskStatus.running(task.getId())); - tryTimeChunkLock( - TaskLockType.APPEND, - task, - Intervals.of("2017/2018") - ); - - LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( - task.getDataSource(), - 25, - ImmutableMap.of( - Tasks.TASK_LOCK_TYPE, - TaskLockType.EXCLUSIVE.name(), - Tasks.USE_CONCURRENT_LOCKS, - true - ) - ); - - Map> conflictingIntervals = - lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); - Assert.assertTrue(conflictingIntervals.isEmpty()); - } - - @Test public void testExclusiveLockCompatibility() { @@ -1770,50 +1616,6 @@ public void testTimechunkLockTypeTransitionForSameTaskGroup() validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask, Intervals.of("2024/2025")); } - @Test - public void testGetLockedIntervalsForRevokedLocks() - { - // Acquire lock for a low priority task - final Task lowPriorityTask = NoopTask.ofPriority(5); - lockbox.add(lowPriorityTask); - taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId())); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - lowPriorityTask, - Intervals.of("2017/2018") - ); - - final Map minTaskPriority = new HashMap<>(); - minTaskPriority.put(lowPriorityTask.getDataSource(), 1); - - Map> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017/2018")), - lockedIntervals.get(lowPriorityTask.getDataSource()) - ); - - // Revoke the lowPriorityTask - final Task highPriorityTask = NoopTask.ofPriority(10); - lockbox.add(highPriorityTask); - tryTimeChunkLock( - TaskLockType.EXCLUSIVE, - highPriorityTask, - Intervals.of("2017-05-01/2017-06-01") - ); - - // Verify the locked intervals - minTaskPriority.put(highPriorityTask.getDataSource(), 1); - lockedIntervals = lockbox.getLockedIntervals(minTaskPriority); - Assert.assertEquals(1, lockedIntervals.size()); - Assert.assertEquals( - Collections.singletonList( - Intervals.of("2017-05-01/2017-06-01")), - lockedIntervals.get(highPriorityTask.getDataSource()) - ); - } - @Test public void testFailedToReacquireTaskLock() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index e6dee0c7e403..4fdf206d449d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -61,6 +61,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; @@ -1055,44 +1057,56 @@ public void testGetTaskStatus() throws Exception } @Test - public void testGetLockedIntervals() throws Exception + public void testGetConflictingLockInfos() throws Exception { - final Map minTaskPriority = Collections.singletonMap("ds1", 0); - final Map> expectedLockedIntervals = Collections.singletonMap( + final List lockFilterPolicies = ImmutableList.of( + new LockFilterPolicy("ds1", 25, null) + ); + final Map> expectedLocks = Collections.singletonMap( "ds1", Arrays.asList( - Intervals.of("2012-01-01/2012-01-02"), - Intervals.of("2012-01-02/2012-01-03") + new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 25, + Intervals.of("2012-01-01/2012-01-02") + ), + new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 75, + Intervals.of("2012-01-01/2012-01-02") + ) ) ); - EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority)) - .andReturn(expectedLockedIntervals); + EasyMock.expect(taskLockbox.getConflictingLockInfos(lockFilterPolicies)) + .andReturn(expectedLocks); replayAll(); - final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority); + final Response response = overlordResource.getConflictingLockInfos(lockFilterPolicies); Assert.assertEquals(200, response.getStatus()); final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - Map> observedLockedIntervals = jsonMapper.readValue( + Map> observedLocks = jsonMapper.readValue( jsonMapper.writeValueAsString(response.getEntity()), - new TypeReference>>() + new TypeReference>>() { } ); - Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals); + Assert.assertEquals(expectedLocks, observedLocks); } @Test - public void testGetLockedIntervalsWithEmptyBody() + public void testGetConflictingLockInfosWithEmptyBody() { replayAll(); - Response response = overlordResource.getDatasourceLockedIntervals(null); + Response response = overlordResource.getConflictingLockInfos(null); Assert.assertEquals(400, response.getStatus()); - response = overlordResource.getDatasourceLockedIntervals(Collections.emptyMap()); + response = overlordResource.getConflictingLockInfos(Collections.emptyList()); Assert.assertEquals(400, response.getStatus()); } diff --git a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java index 88ab4673aa8a..d4d2541c3696 100644 --- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java +++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java @@ -21,8 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -33,18 +36,18 @@ public class LockFilterPolicy { private final String datasource; private final int priority; - private final Map context; + private final List intervals; @JsonCreator public LockFilterPolicy( @JsonProperty("datasource") String datasource, @JsonProperty("priority") int priority, - @JsonProperty("context") Map context + @JsonProperty("intervals") @Nullable List intervals ) { this.datasource = datasource; this.priority = priority; - this.context = context == null ? Collections.emptyMap() : context; + this.intervals = intervals; } @JsonProperty @@ -59,10 +62,18 @@ public int getPriority() return priority; } + @Deprecated @JsonProperty public Map getContext() { - return context; + return Collections.emptyMap(); + } + + @Nullable + @JsonProperty + public List getIntervals() + { + return intervals; } @Override @@ -77,12 +88,12 @@ public boolean equals(Object o) LockFilterPolicy that = (LockFilterPolicy) o; return Objects.equals(datasource, that.datasource) && priority == that.priority - && Objects.equals(context, that.context); + && Objects.equals(intervals, that.intervals); } @Override public int hashCode() { - return Objects.hash(datasource, priority, context); + return Objects.hash(datasource, priority, intervals); } } diff --git a/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java b/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java new file mode 100644 index 000000000000..444ef5fb6ee8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Interval; + +import java.util.Objects; + +/** + * Contains information about an active task lock for a given datasource + */ +public class TaskLockInfo +{ + private final String granularity; + private final String type; + private final int priority; + private final Interval interval; + + @JsonCreator + public TaskLockInfo( + @JsonProperty("granularity") String granularity, + @JsonProperty("type") String type, + @JsonProperty("priority") int priority, + @JsonProperty("interval") Interval interval + ) + { + this.granularity = granularity; + this.type = type; + this.priority = priority; + this.interval = interval; + } + + @JsonProperty + public String getGranularity() + { + return granularity; + } + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public int getPriority() + { + return priority; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskLockInfo that = (TaskLockInfo) o; + return Objects.equals(granularity, that.granularity) + && Objects.equals(type, that.type) + && priority == that.priority + && Objects.equals(interval, that.interval); + } + + @Override + public int hashCode() + { + return Objects.hash(granularity, type, priority, interval); + } + + @Override + public String toString() + { + return "TaskLockInfo{" + + "granularity=" + granularity + + ", type=" + type + + ", interval=" + interval + + ", priority=" + priority + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 310684206d28..925408664071 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.ServiceRetryPolicy; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -180,15 +181,13 @@ ListenableFuture> taskStatuses( ListenableFuture> supervisorStatuses(); /** - * Returns a list of intervals locked by higher priority conflicting lock types + * Returns a list of Locks of higher priority with conflicting intervals * * @param lockFilterPolicies List of all filters for different datasources - * @return Map from datasource name to list of intervals locked by tasks that have a conflicting lock type with + * @return Map from datasource name to list of locks held by tasks that have conflicting intervals with * priority greater than or equal to the {@code minTaskPriority} for that datasource. */ - ListenableFuture>> findLockedIntervals( - List lockFilterPolicies - ); + ListenableFuture>> findConflictingLockInfos(List lockFilterPolicies); /** * Deletes pending segment records from the metadata store for a particular datasource. Records with diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index 3e3d86ca5f25..b539f6955692 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; @@ -190,11 +191,11 @@ public ListenableFuture taskStatus(final String taskId) } @Override - public ListenableFuture>> findLockedIntervals( + public ListenableFuture>> findConflictingLockInfos( List lockFilterPolicies ) { - final String path = "/druid/indexer/v1/lockedIntervals/v2"; + final String path = "/druid/indexer/v1/conflictingLocks"; return FutureUtils.transform( client.asyncRequest( @@ -203,10 +204,10 @@ public ListenableFuture>> findLockedIntervals( new BytesFullResponseHandler() ), holder -> { - final Map> response = JacksonUtils.readValue( + final Map> response = JacksonUtils.readValue( jsonMapper, holder.getContent(), - new TypeReference>>() {} + new TypeReference>>() {} ); return response == null ? Collections.emptyMap() : response; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 01f3bc77e9ee..8933ff53c370 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -46,6 +46,7 @@ import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -67,8 +68,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -272,16 +275,25 @@ private boolean cancelTaskIfGranularityChanged( * higher priority Task * */ - private Map> getLockedIntervals( + private Map> getLockedIntervals( List compactionConfigs ) { final List lockFilterPolicies = compactionConfigs .stream() - .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), config.getTaskContext())) + .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null)) .collect(Collectors.toList()); - final Map> datasourceToLockedIntervals = - new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies), true)); + final Map> datasourceToLocks = + FutureUtils.getUnchecked(overlordClient.findConflictingLockInfos(lockFilterPolicies), true); + final Map> datasourceToLockedIntervals = new HashMap<>(); + for (Map.Entry> locks : datasourceToLocks.entrySet()) { + final String datasource = locks.getKey(); + datasourceToLockedIntervals.put(datasource, new HashSet<>()); + for (TaskLockInfo lock : locks.getValue()) { + datasourceToLockedIntervals.get(datasource) + .add(lock.getInterval()); + } + } LOG.debug( "Skipping the following intervals for Compaction as they are currently locked: %s", datasourceToLockedIntervals diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index 10ebeb53af26..8832998239fe 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -26,6 +26,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.joda.time.Interval; @@ -97,7 +98,7 @@ public ListenableFuture> supervisorStatuses( } @Override - public ListenableFuture>> findLockedIntervals( + public ListenableFuture>> findConflictingLockInfos( List lockFilterPolicies ) { diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 8c3b867e3681..49e5495eae1c 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.MockServiceClient; import org.apache.druid.rpc.RequestBuilder; @@ -53,7 +54,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpVersion; -import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -220,16 +220,26 @@ public void test_taskStatuses_null_null_zero() throws Exception } @Test - public void test_findLockedIntervals() throws Exception + public void test_findConflictingLockInfos() throws Exception { - final Map> lockMap = - ImmutableMap.of("foo", Collections.singletonList(Intervals.of("2000/2001"))); + final Map> lockMap = + ImmutableMap.of( + "foo", + Collections.singletonList( + new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2000/2001") + ) + ) + ); final List requests = ImmutableList.of( new LockFilterPolicy("foo", 3, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLocks") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), @@ -238,19 +248,19 @@ public void test_findLockedIntervals() throws Exception Assert.assertEquals( lockMap, - overlordClient.findLockedIntervals(requests).get() + overlordClient.findConflictingLockInfos(requests).get() ); } @Test - public void test_findLockedIntervals_nullReturn() throws Exception + public void test_findConflictingLockInfos_nullReturn() throws Exception { final List requests = ImmutableList.of( new LockFilterPolicy("foo", 3, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLocks") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), @@ -259,7 +269,7 @@ public void test_findLockedIntervals_nullReturn() throws Exception Assert.assertEquals( Collections.emptyMap(), - overlordClient.findLockedIntervals(requests).get() + overlordClient.findConflictingLockInfos(requests).get() ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 236cfaf7da54..f8c196de48f4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -61,6 +61,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; @@ -1137,7 +1138,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() .thenReturn( Futures.immediateFuture( CloseableIterators.withEmptyBaggage(ImmutableList.of(runningConflictCompactionTask).iterator()))); - Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any())) + Mockito.when(mockClient.findConflictingLockInfos(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.cancelTask(conflictTaskId)) .thenReturn(Futures.immediateFuture(null)); @@ -1229,20 +1230,35 @@ public void testRunWithLockedIntervals() // Lock all intervals for dataSource_1 and dataSource_2 final String datasource1 = DATA_SOURCE_PREFIX + 1; - overlordClient.lockedIntervals + overlordClient.conflictingLockInfos .computeIfAbsent(datasource1, k -> new ArrayList<>()) - .add(Intervals.of("2017/2018")); + .add(new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2017/2018") + )); final String datasource2 = DATA_SOURCE_PREFIX + 2; - overlordClient.lockedIntervals + overlordClient.conflictingLockInfos .computeIfAbsent(datasource2, k -> new ArrayList<>()) - .add(Intervals.of("2017/2018")); + .add(new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2017/2018") + )); // Lock all intervals but one for dataSource_0 final String datasource0 = DATA_SOURCE_PREFIX + 0; - overlordClient.lockedIntervals + overlordClient.conflictingLockInfos .computeIfAbsent(datasource0, k -> new ArrayList<>()) - .add(Intervals.of("2017-01-01T13:00:00Z/2017-02-01")); + .add(new TaskLockInfo( + "TIME_CHUNK", + "EXCLUSIVE", + 50, + Intervals.of("2017-01-01T13:00:00Z/2017-02-01") + )); // Verify that locked intervals are skipped and only one compaction task // is submitted for dataSource_0 @@ -2026,7 +2042,7 @@ private class TestOverlordClient extends NoopOverlordClient private final ObjectMapper jsonMapper; // Map from Task Id to the intervals locked by that task - private final Map> lockedIntervals = new HashMap<>(); + private final Map> conflictingLockInfos = new HashMap<>(); // List of submitted compaction tasks for verification in the tests private final List submittedCompactionTasks = new ArrayList<>(); @@ -2069,11 +2085,11 @@ public ListenableFuture runTask(String taskId, Object taskObject) @Override - public ListenableFuture>> findLockedIntervals( + public ListenableFuture>> findConflictingLockInfos( List lockFilterPolicies ) { - return Futures.immediateFuture(lockedIntervals); + return Futures.immediateFuture(conflictingLockInfos); } @Override @@ -2262,7 +2278,7 @@ private static ArgumentCaptor setUpMockClient(final OverlordClient mockC final ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(Object.class); Mockito.when(mockClient.taskStatuses(null, null, 0)) .thenReturn(Futures.immediateFuture(CloseableIterators.withEmptyBaggage(Collections.emptyIterator()))); - Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any())) + Mockito.when(mockClient.findConflictingLockInfos(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.getTotalWorkerCapacity()) .thenReturn(Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(0, 0))); From 3775cceb599404058db78a8a33a44a340189d23a Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 29 Jul 2024 10:30:57 +0530 Subject: [PATCH 2/6] Complete changes --- .../druid/indexing/overlord/TaskLockbox.java | 26 ++++ .../overlord/http/OverlordResource.java | 3 +- .../indexing/overlord/TaskLockboxTest.java | 135 +++++++++++++++++- .../overlord/http/OverlordResourceTest.java | 7 +- .../druid/testsEx/indexer/ITIndexerTest.java | 20 +-- .../clients/OverlordResourceTestClient.java | 14 +- .../ITAutoCompactionLockContentionTest.java | 16 ++- .../druid/tests/indexer/ITIndexerTest.java | 20 +-- .../druid/metadata/LockFilterPolicy.java | 13 +- .../rpc/indexing/OverlordClientImpl.java | 41 +++--- .../coordinator/duty/CompactSegments.java | 3 +- .../druid/server/http/TaskLockResponse.java | 74 ++++++++++ .../rpc/indexing/OverlordClientImplTest.java | 9 +- 13 files changed, 322 insertions(+), 59 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index bb033aea9c08..aba22d45c107 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateResult; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -50,6 +51,7 @@ import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.metadata.TaskLockInfo; +import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -951,6 +953,27 @@ public Map> getConflictingLockInfos(List context = lockFilter.getContext(); + final boolean ignoreAppendLocks; + final Boolean useConcurrentLocks = QueryContexts.getAsBoolean( + Tasks.USE_CONCURRENT_LOCKS, + context.get(Tasks.USE_CONCURRENT_LOCKS) + ); + if (useConcurrentLocks == null) { + TaskLockType taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + if (taskLockType == null) { + ignoreAppendLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; + } else { + ignoreAppendLocks = taskLockType == TaskLockType.APPEND; + } + } else { + ignoreAppendLocks = useConcurrentLocks; + } + running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( (interval, taskLockPosses) -> taskLockPosses.forEach( @@ -960,6 +983,9 @@ public Map> getConflictingLockInfos(List lockFilterPolicie } // Build the response - return Response.ok(taskQueryTool.getConflictingLockInfos(lockFilterPolicies)).build(); + return Response.ok(new TaskLockResponse(taskQueryTool.getConflictingLockInfos(lockFilterPolicies))).build(); } @GET diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index d3ce5b702bd8..36fb88015c69 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.indexer.TaskStatus; @@ -41,6 +42,7 @@ import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -1200,12 +1202,143 @@ public void testGetConflictingLockInfos() LockFilterPolicy policy = new LockFilterPolicy( "none", 50, - ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")) + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + null + ); + + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, + null, + null + ); + + Map> conflictingLocks = + lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, conflictingLocks.size()); + Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); + } + + @Test + public void testGetConflictingLockInfosWithAppendLockIgnoresAppendLocks() + { + final Set expectedConflicts = new HashSet<>(); + final TaskLockInfo overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingReplaceLock); + + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); + + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); + + final TaskLockInfo overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingExclusiveLock); + + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name()) + ); + + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, + null, + null + ); + + Map> conflictingLocks = + lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, conflictingLocks.size()); + Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); + } + + @Test + public void testGetConflictingLockInfosWithConcurrentLocksIgnoresAppendLocks() + { + final Set expectedConflicts = new HashSet<>(); + final TaskLockInfo overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingReplaceLock); + + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); + + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); + + final TaskLockInfo overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingExclusiveLock); + + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true, Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name()) + ); + + LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( + "nonExistent", + 0, + null, + null + ); + + Map> conflictingLocks = + lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Assert.assertEquals(1, conflictingLocks.size()); + Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); + } + + @Test + public void testGetConflictingLockInfosWithoutConcurrentLocksConsidersAppendLocks() + { + final Set expectedConflicts = new HashSet<>(); + final TaskLockInfo overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingReplaceLock); + + //Lower priority + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); + + final TaskLockInfo overlappingAppendLock = + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75) + .toTaskLockInfo(); + expectedConflicts.add(overlappingAppendLock); + + // Non-overlapping interval + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); + + final TaskLockInfo overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) + .toTaskLockInfo(); + expectedConflicts.add(overlappingExclusiveLock); + + LockFilterPolicy policy = new LockFilterPolicy( + "none", + 50, + ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")), + ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, false, Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name()) ); LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy( "nonExistent", 0, + null, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 4fdf206d449d..88ca5cff8c7b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -68,6 +68,7 @@ import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.http.TaskLockResponse; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; @@ -1060,7 +1061,7 @@ public void testGetTaskStatus() throws Exception public void testGetConflictingLockInfos() throws Exception { final List lockFilterPolicies = ImmutableList.of( - new LockFilterPolicy("ds1", 25, null) + new LockFilterPolicy("ds1", 25, null, null) ); final Map> expectedLocks = Collections.singletonMap( "ds1", @@ -1090,10 +1091,10 @@ public void testGetConflictingLockInfos() throws Exception final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); Map> observedLocks = jsonMapper.readValue( jsonMapper.writeValueAsString(response.getEntity()), - new TypeReference>>() + new TypeReference() { } - ); + ).getTaskLocks(); Assert.assertEquals(expectedLocks, observedLocks); } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java index 65b8dc0b1ac0..c2048a026ade 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java @@ -24,12 +24,13 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testsEx.categories.BatchIndex; import org.apache.druid.testsEx.config.DruidTestRunner; -import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -343,22 +344,23 @@ public void testGetLockedIntervals() throws Exception submitIndexTask(INDEX_TASK, datasourceName); // Wait until it acquires a lock - final Map minTaskPriority = Collections.singletonMap(datasourceName, 0); - final Map> lockedIntervals = new HashMap<>(); + final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null)); + final Map> locks = new HashMap<>(); ITRetryUtil.retryUntilFalse( () -> { - lockedIntervals.clear(); - lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority)); - return lockedIntervals.isEmpty(); + locks.clear(); + locks.putAll(indexer.getConflictingLocks(lockFilterPolicies)); + return locks.isEmpty(); }, "Verify Intervals are Locked" ); // Verify the locked intervals for this datasource - Assert.assertEquals(lockedIntervals.size(), 1); + Assert.assertEquals(locks.size(), 1); + Assert.assertEquals(locks.get(datasourceName).size(), 1); Assert.assertEquals( - lockedIntervals.get(datasourceName), - Collections.singletonList(Intervals.of("2013-08-31/2013-09-02")) + locks.get(datasourceName).get(0).getInterval(), + Intervals.of("2013-08-31/2013-09-02") ); ITRetryUtil.retryUntilTrue( diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 8167b9b64e12..2fe11297fd14 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -39,13 +39,15 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; +import org.apache.druid.server.http.TaskLockResponse; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.joda.time.Interval; import java.net.URL; import java.util.ArrayList; @@ -334,13 +336,13 @@ public TaskReport.ReportMap getTaskReport(String taskId) } } - public Map> getLockedIntervals(Map minTaskPriority) + public Map> getConflictingLocks(List lockFilterPolicies) { try { - String jsonBody = jsonMapper.writeValueAsString(minTaskPriority); + String jsonBody = jsonMapper.writeValueAsString(lockFilterPolicies); StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals")) + new Request(HttpMethod.POST, new URL(getIndexerURL() + "conflictingLocks")) .setContent( "application/json", StringUtils.toUtf8(jsonBody) @@ -349,10 +351,10 @@ public Map> getLockedIntervals(Map minTa ).get(); return jsonMapper.readValue( response.getContent(), - new TypeReference>>() + new TypeReference() { } - ); + ).getTaskLocks(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 8d980d76f12a..07e567476820 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -24,6 +24,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -58,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Integration Test to verify behaviour when there is a lock contention between @@ -265,15 +268,20 @@ private void ensureSegmentsLoaded() */ private void ensureLockedIntervals(Interval... intervals) { - final Map minTaskPriority = Collections.singletonMap(fullDatasourceName, 0); + final LockFilterPolicy lockFilterPolicy = new LockFilterPolicy(fullDatasourceName, 0, null, null); final List lockedIntervals = new ArrayList<>(); ITRetryUtil.retryUntilTrue( () -> { lockedIntervals.clear(); - Map> allIntervals = indexer.getLockedIntervals(minTaskPriority); - if (allIntervals.containsKey(fullDatasourceName)) { - lockedIntervals.addAll(allIntervals.get(fullDatasourceName)); + Map> locks = indexer.getConflictingLocks(Collections.singletonList(lockFilterPolicy)); + if (locks.containsKey(fullDatasourceName)) { + lockedIntervals.addAll( + locks.get(fullDatasourceName) + .stream() + .map(TaskLockInfo::getInterval) + .collect(Collectors.toList()) + ); } LOG.info("Locked intervals: %s", lockedIntervals); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index f527135c80d3..df268eeda774 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -24,12 +24,13 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.tests.TestNGGroup; -import org.joda.time.Interval; import org.testng.Assert; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -342,22 +343,23 @@ public void testGetLockedIntervals() throws Exception submitIndexTask(INDEX_TASK, datasourceName); // Wait until it acquires a lock - final Map minTaskPriority = Collections.singletonMap(datasourceName, 0); - final Map> lockedIntervals = new HashMap<>(); + final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null)); + final Map> locks = new HashMap<>(); ITRetryUtil.retryUntilFalse( () -> { - lockedIntervals.clear(); - lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority)); - return lockedIntervals.isEmpty(); + locks.clear(); + locks.putAll(indexer.getConflictingLocks(lockFilterPolicies)); + return locks.isEmpty(); }, "Verify Intervals are Locked" ); // Verify the locked intervals for this datasource - Assert.assertEquals(lockedIntervals.size(), 1); + Assert.assertEquals(locks.size(), 1); + Assert.assertEquals(locks.get(datasourceName).size(), 1); Assert.assertEquals( - lockedIntervals.get(datasourceName), - Collections.singletonList(Intervals.of("2013-08-31/2013-09-02")) + locks.get(datasourceName).get(0).getInterval(), + Intervals.of("2013-08-31/2013-09-02") ); ITRetryUtil.retryUntilTrue( diff --git a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java index d4d2541c3696..4cd0f548cd67 100644 --- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java +++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java @@ -37,17 +37,20 @@ public class LockFilterPolicy private final String datasource; private final int priority; private final List intervals; + private final Map context; @JsonCreator public LockFilterPolicy( @JsonProperty("datasource") String datasource, @JsonProperty("priority") int priority, - @JsonProperty("intervals") @Nullable List intervals + @JsonProperty("intervals") @Nullable List intervals, + @JsonProperty("context") @Nullable Map context ) { this.datasource = datasource; this.priority = priority; this.intervals = intervals; + this.context = context == null ? Collections.emptyMap() : context; } @JsonProperty @@ -62,11 +65,10 @@ public int getPriority() return priority; } - @Deprecated @JsonProperty public Map getContext() { - return Collections.emptyMap(); + return context; } @Nullable @@ -88,12 +90,13 @@ public boolean equals(Object o) LockFilterPolicy that = (LockFilterPolicy) o; return Objects.equals(datasource, that.datasource) && priority == that.priority - && Objects.equals(intervals, that.intervals); + && Objects.equals(intervals, that.intervals) + && Objects.equals(context, that.context); } @Override public int hashCode() { - return Objects.hash(datasource, priority, intervals); + return Objects.hash(datasource, priority, intervals, context); } } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index b539f6955692..afb20631fc0e 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -46,6 +46,7 @@ import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; import org.apache.druid.rpc.ServiceRetryPolicy; +import org.apache.druid.server.http.TaskLockResponse; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; @@ -197,22 +198,30 @@ public ListenableFuture>> findConflictingLockInfo { final String path = "/druid/indexer/v1/conflictingLocks"; - return FutureUtils.transform( - client.asyncRequest( - new RequestBuilder(HttpMethod.POST, path) - .jsonContent(jsonMapper, lockFilterPolicies), - new BytesFullResponseHandler() - ), - holder -> { - final Map> response = JacksonUtils.readValue( - jsonMapper, - holder.getContent(), - new TypeReference>>() {} - ); - - return response == null ? Collections.emptyMap() : response; - } - ); + try { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, path) + .jsonContent(jsonMapper, lockFilterPolicies), + new BytesFullResponseHandler() + ), + holder -> { + final Map> response = JacksonUtils.readValue( + jsonMapper, + holder.getContent(), + new TypeReference() + { + } + ).getTaskLocks(); + + return response == null ? Collections.emptyMap() : response; + } + ); + } + catch (Exception e) { + // If there is an exception due to Overlord and Coordinator master version mismatch + return Futures.immediateFuture(Collections.emptyMap()); + } } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 8933ff53c370..b9bfe986df71 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -281,7 +281,8 @@ private Map> getLockedIntervals( { final List lockFilterPolicies = compactionConfigs .stream() - .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null)) + .map(config -> + new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null, config.getTaskContext())) .collect(Collectors.toList()); final Map> datasourceToLocks = FutureUtils.getUnchecked(overlordClient.findConflictingLockInfos(lockFilterPolicies), true); diff --git a/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java b/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java new file mode 100644 index 000000000000..97b4d1f8c06e --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.metadata.TaskLockInfo; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class TaskLockResponse +{ + private final Map> taskLocks; + + @JsonCreator + public TaskLockResponse( + @JsonProperty("taskLocks") final Map> taskLocks + ) + { + this.taskLocks = taskLocks; + } + + @JsonProperty + public Map> getTaskLocks() + { + return taskLocks; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final TaskLockResponse that = (TaskLockResponse) o; + return Objects.equals(taskLocks, that.taskLocks); + } + + @Override + public int hashCode() + { + return Objects.hash(taskLocks); + } + + @Override + public String toString() + { + return "TaskLockResponse{" + + "taskLocks='" + taskLocks + + '}'; + } +} diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 49e5495eae1c..2d35399dfa45 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -48,6 +48,7 @@ import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.MockServiceClient; import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.server.http.TaskLockResponse; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; @@ -235,7 +236,7 @@ public void test_findConflictingLockInfos() throws Exception ) ); final List requests = ImmutableList.of( - new LockFilterPolicy("foo", 3, null) + new LockFilterPolicy("foo", 3, null, null) ); serviceClient.expectAndRespond( @@ -243,7 +244,7 @@ public void test_findConflictingLockInfos() throws Exception .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), - jsonMapper.writeValueAsBytes(lockMap) + jsonMapper.writeValueAsBytes(new TaskLockResponse(lockMap)) ); Assert.assertEquals( @@ -256,7 +257,7 @@ public void test_findConflictingLockInfos() throws Exception public void test_findConflictingLockInfos_nullReturn() throws Exception { final List requests = ImmutableList.of( - new LockFilterPolicy("foo", 3, null) + new LockFilterPolicy("foo", 3, null, null) ); serviceClient.expectAndRespond( @@ -264,7 +265,7 @@ public void test_findConflictingLockInfos_nullReturn() throws Exception .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), - jsonMapper.writeValueAsBytes(null) + jsonMapper.writeValueAsBytes(new TaskLockResponse(null)) ); Assert.assertEquals( From e01bf3653bcdc4d63ab4d1e7bde72e8ba4238322 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 29 Jul 2024 13:34:47 +0530 Subject: [PATCH 3/6] Revert unneeded changes --- .../druid/indexing/common/TaskLock.java | 6 - .../indexing/common/actions/TaskLocks.java | 11 ++ .../druid/indexing/overlord/TaskLockbox.java | 79 +++++++- .../indexing/overlord/TaskQueryTool.java | 17 +- .../overlord/http/OverlordResource.java | 20 +- .../indexing/overlord/TaskLockboxTest.java | 182 ++++++++++++++---- .../overlord/http/OverlordResourceTest.java | 68 +++++-- .../druid/testsEx/indexer/ITIndexerTest.java | 17 +- .../clients/OverlordResourceTestClient.java | 11 +- .../ITAutoCompactionLockContentionTest.java | 15 +- .../druid/tests/indexer/ITIndexerTest.java | 17 +- ...kLockInfo.java => ActiveTaskLockInfo.java} | 8 +- .../druid/rpc/indexing/OverlordClient.java | 9 +- .../rpc/indexing/OverlordClientImpl.java | 48 ++--- .../coordinator/duty/CompactSegments.java | 18 +- .../druid/server/http/TaskLockResponse.java | 34 +--- .../client/indexing/NoopOverlordClient.java | 3 +- .../rpc/indexing/OverlordClientImplTest.java | 33 ++-- .../coordinator/duty/CompactSegmentsTest.java | 38 ++-- 19 files changed, 402 insertions(+), 232 deletions(-) rename server/src/main/java/org/apache/druid/metadata/{TaskLockInfo.java => ActiveTaskLockInfo.java} (94%) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java index e3f5c0a42809..eb96eb4fed91 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLock.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.LockRequest; -import org.apache.druid.metadata.TaskLockInfo; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -78,9 +77,4 @@ default void assertNotRevoked() .build("Lock of type[%s] for interval[%s] was revoked", getType(), getInterval()); } } - - default TaskLockInfo toTaskLockInfo() - { - return new TaskLockInfo(getGranularity().name(), getType().name(), getNonNullPriority(), getInterval()); - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java index aeecba0e289d..a875ff1d457a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java @@ -31,6 +31,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -253,4 +254,14 @@ private static NavigableMap> getTaskLockMap(TaskLockbox .add(taskLock)); return taskLockMap; } + + public static ActiveTaskLockInfo toLockInfo(TaskLock taskLock) + { + return new ActiveTaskLockInfo( + taskLock.getGranularity().name(), + taskLock.getType().name(), + taskLock.getNonNullPriority(), + taskLock.getInterval() + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index aba22d45c107..1165604147e6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -38,6 +38,7 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentAllocateRequest; import org.apache.druid.indexing.common.actions.SegmentAllocateResult; +import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; @@ -48,9 +49,9 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; @@ -928,11 +929,79 @@ private Set getNonRevokedReplaceLocks(List posse /** * @param lockFilterPolicies Lock filters for the given datasources - * @return Map from datasource to list of non-revoked lock infos with at least as much priority and an overlapping interval + * @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions */ - public Map> getConflictingLockInfos(List lockFilterPolicies) + public Map> getLockedIntervals(List lockFilterPolicies) { - final Map> datasourceToLocks = new HashMap<>(); + final Map> datasourceToIntervals = new HashMap<>(); + + // Take a lock and populate the maps + giant.lock(); + + try { + lockFilterPolicies.forEach( + lockFilter -> { + final String datasource = lockFilter.getDatasource(); + if (!running.containsKey(datasource)) { + return; + } + + final int priority = lockFilter.getPriority(); + final boolean isReplaceLock = TaskLockType.REPLACE.name().equals( + lockFilter.getContext().getOrDefault( + Tasks.TASK_LOCK_TYPE, + Tasks.DEFAULT_TASK_LOCK_TYPE + ) + ); + final boolean isUsingConcurrentLocks = Boolean.TRUE.equals( + lockFilter.getContext().getOrDefault( + Tasks.USE_CONCURRENT_LOCKS, + Tasks.DEFAULT_USE_CONCURRENT_LOCKS + ) + ); + final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock; + + running.get(datasource).forEach( + (startTime, startTimeLocks) -> startTimeLocks.forEach( + (interval, taskLockPosses) -> taskLockPosses.forEach( + taskLockPosse -> { + if (taskLockPosse.getTaskLock().isRevoked()) { + // do nothing + } else if (ignoreAppendLocks + && TaskLockType.APPEND.equals(taskLockPosse.getTaskLock().getType())) { + // do nothing + } else if (taskLockPosse.getTaskLock().getPriority() == null + || taskLockPosse.getTaskLock().getPriority() < priority) { + // do nothing + } else { + datasourceToIntervals.computeIfAbsent(datasource, k -> new HashSet<>()) + .add(interval); + } + } + ) + ) + ); + } + ); + } + finally { + giant.unlock(); + } + + return datasourceToIntervals.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> new ArrayList<>(entry.getValue()) + )); + } + + /** + * @param lockFilterPolicies Lock filters for the given datasources + * @return Map from datasource to list of non-revoked locks with at least as much priority and an overlapping interval + */ + public Map> getActiveLocks(List lockFilterPolicies) + { + final Map> datasourceToLocks = new HashMap<>(); // Take a lock and populate the maps giant.lock(); @@ -990,7 +1059,7 @@ public Map> getConflictingLockInfos(List new ArrayList<>()) - .add(taskLockPosse.getTaskLock().toTaskLockInfo()); + .add(TaskLocks.toLockInfo(taskLockPosse.getTaskLock())); break; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index e5dab8526a91..c0462887dabd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -37,8 +37,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.Duration; @@ -86,12 +86,21 @@ public TaskQueryTool( } /** - * @param lockFilterPolicies Requests for conflicing locks for various datasources + * @param lockFilterPolicies Requests for conflicing lock intervals for various datasources + * @return Map from datasource to intervals locked by tasks that have a conflicting lock type that cannot be revoked + */ + public Map> getLockedIntervals(List lockFilterPolicies) + { + return taskLockbox.getLockedIntervals(lockFilterPolicies); + } + + /** + * @param lockFilterPolicies Requests for active locks for various datasources * @return Map from datasource to conflicting lock infos */ - public Map> getConflictingLockInfos(List lockFilterPolicies) + public Map> getActiveLocks(List lockFilterPolicies) { - return taskLockbox.getConflictingLockInfos(lockFilterPolicies); + return taskLockbox.getActiveLocks(lockFilterPolicies); } public List> getActiveTaskInfo(@Nullable String dataSource) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 49501f24b7b7..727e2d7e7d54 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -243,17 +243,31 @@ public Response isLeader() } @POST - @Path("/conflictingLocks") + @Path("/lockedIntervals/v2") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(StateResourceFilter.class) - public Response getConflictingLockInfos(List lockFilterPolicies) + public Response getDatasourceLockedIntervals(List lockFilterPolicies) { if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) { return Response.status(Status.BAD_REQUEST).entity("No filter provided").build(); } // Build the response - return Response.ok(new TaskLockResponse(taskQueryTool.getConflictingLockInfos(lockFilterPolicies))).build(); + return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build(); + } + + @POST + @Path("/activeLocks") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(StateResourceFilter.class) + public Response getActiveLocks(List lockFilterPolicies) + { + if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) { + return Response.status(Status.BAD_REQUEST).entity("No filter provided").build(); + } + + // Build the response + return Response.ok(new TaskLockResponse(taskQueryTool.getActiveLocks(lockFilterPolicies))).build(); } @GET diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 36fb88015c69..77b473136631 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.AbstractTask; @@ -51,11 +52,11 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.MetadataStorageTablesConfig; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; @@ -1175,28 +1176,134 @@ public void testGetTimeChunkAndSegmentLockForDifferentGroup() } @Test - public void testGetConflictingLockInfos() + public void testGetLockedIntervalsForHigherPriorityExclusiveLock() { - final Set expectedConflicts = new HashSet<>(); - final TaskLockInfo overlappingReplaceLock = + final Task task = NoopTask.ofPriority(50); + lockbox.add(task); + taskStorage.insert(task, TaskStatus.running(task.getId())); + tryTimeChunkLock( + TaskLockType.APPEND, + task, + Intervals.of("2017/2018") + ); + + LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( + task.getDataSource(), + 75, + null, + null + ); + + Map> conflictingIntervals = + lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); + Assert.assertTrue(conflictingIntervals.isEmpty()); + } + + @Test + public void testGetLockedIntervalsForLowerPriorityExclusiveLock() + { + final Task task = NoopTask.ofPriority(50); + lockbox.add(task); + taskStorage.insert(task, TaskStatus.running(task.getId())); + tryTimeChunkLock( + TaskLockType.APPEND, + task, + Intervals.of("2017/2018") + ); + + LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy( + task.getDataSource(), + 25, + null, + null + ); + + Map> conflictingIntervals = + lockbox.getLockedIntervals(ImmutableList.of(requestForExclusiveLowerPriorityLock)); + Assert.assertEquals(1, conflictingIntervals.size()); + Assert.assertEquals( + Collections.singletonList(Intervals.of("2017/2018")), + conflictingIntervals.get(task.getDataSource()) + ); + } + + @Test + public void testGetLockedIntervalsForLowerPriorityReplaceLock() + { + final Task task = NoopTask.ofPriority(50); + lockbox.add(task); + taskStorage.insert(task, TaskStatus.running(task.getId())); + tryTimeChunkLock( + TaskLockType.APPEND, + task, + Intervals.of("2017/2018") + ); + + LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( + task.getDataSource(), + 25, + null, + ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name()) + ); + + Map> conflictingIntervals = + lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); + Assert.assertTrue(conflictingIntervals.isEmpty()); + } + + @Test + public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() + { + final Task task = NoopTask.ofPriority(50); + lockbox.add(task); + taskStorage.insert(task, TaskStatus.running(task.getId())); + tryTimeChunkLock( + TaskLockType.APPEND, + task, + Intervals.of("2017/2018") + ); + + LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( + task.getDataSource(), + 25, + null, + ImmutableMap.of( + Tasks.TASK_LOCK_TYPE, + TaskLockType.EXCLUSIVE.name(), + Tasks.USE_CONCURRENT_LOCKS, + true + ) + ); + + Map> conflictingIntervals = + lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); + Assert.assertTrue(conflictingIntervals.isEmpty()); + } + + + @Test + public void testGetActiveLocks() + { + final Set expectedConflicts = new HashSet<>(); + final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingReplaceLock); //Lower priority validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); - final TaskLockInfo overlappingAppendLock = + final ActiveTaskLockInfo overlappingAppendLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingAppendLock); // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final TaskLockInfo overlappingExclusiveLock = + final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( @@ -1213,19 +1320,19 @@ public void testGetConflictingLockInfos() null ); - Map> conflictingLocks = - lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Map> conflictingLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); Assert.assertEquals(1, conflictingLocks.size()); Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); } @Test - public void testGetConflictingLockInfosWithAppendLockIgnoresAppendLocks() + public void testGetActiveLocksWithAppendLockIgnoresAppendLocks() { - final Set expectedConflicts = new HashSet<>(); - final TaskLockInfo overlappingReplaceLock = + final Set expectedConflicts = new HashSet<>(); + final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingReplaceLock); //Lower priority @@ -1236,9 +1343,9 @@ public void testGetConflictingLockInfosWithAppendLockIgnoresAppendLocks() // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final TaskLockInfo overlappingExclusiveLock = + final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( @@ -1255,19 +1362,19 @@ public void testGetConflictingLockInfosWithAppendLockIgnoresAppendLocks() null ); - Map> conflictingLocks = - lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Map> conflictingLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); Assert.assertEquals(1, conflictingLocks.size()); Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); } @Test - public void testGetConflictingLockInfosWithConcurrentLocksIgnoresAppendLocks() + public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks() { - final Set expectedConflicts = new HashSet<>(); - final TaskLockInfo overlappingReplaceLock = + final Set expectedConflicts = new HashSet<>(); + final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingReplaceLock); //Lower priority @@ -1278,9 +1385,9 @@ public void testGetConflictingLockInfosWithConcurrentLocksIgnoresAppendLocks() // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final TaskLockInfo overlappingExclusiveLock = + final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( @@ -1297,35 +1404,36 @@ public void testGetConflictingLockInfosWithConcurrentLocksIgnoresAppendLocks() null ); - Map> conflictingLocks = - lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Map> conflictingLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); Assert.assertEquals(1, conflictingLocks.size()); Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); } @Test - public void testGetConflictingLockInfosWithoutConcurrentLocksConsidersAppendLocks() + public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks() { - final Set expectedConflicts = new HashSet<>(); - final TaskLockInfo overlappingReplaceLock = + final Set expectedConflicts = new HashSet<>(); + final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - .toTaskLockInfo(); + ); + expectedConflicts.add(overlappingReplaceLock); //Lower priority validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); - final TaskLockInfo overlappingAppendLock = + final ActiveTaskLockInfo overlappingAppendLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingAppendLock); // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final TaskLockInfo overlappingExclusiveLock = + final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - .toTaskLockInfo(); + ); expectedConflicts.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( @@ -1342,8 +1450,8 @@ public void testGetConflictingLockInfosWithoutConcurrentLocksConsidersAppendLock null ); - Map> conflictingLocks = - lockbox.getConflictingLockInfos(ImmutableList.of(policy, policyForNonExistentDatasource)); + Map> conflictingLocks = + lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); Assert.assertEquals(1, conflictingLocks.size()); Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 88ca5cff8c7b..27f8ea739ee3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -61,8 +61,8 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; @@ -1058,21 +1058,65 @@ public void testGetTaskStatus() throws Exception } @Test - public void testGetConflictingLockInfos() throws Exception + public void testGetLockedIntervals() throws Exception { final List lockFilterPolicies = ImmutableList.of( new LockFilterPolicy("ds1", 25, null, null) ); - final Map> expectedLocks = Collections.singletonMap( + final Map> expectedIntervals = Collections.singletonMap( "ds1", Arrays.asList( - new TaskLockInfo( + Intervals.of("2012-01-01/2012-01-02"), + Intervals.of("2012-01-01/2012-01-02") + ) + ); + + EasyMock.expect(taskLockbox.getLockedIntervals(lockFilterPolicies)) + .andReturn(expectedIntervals); + replayAll(); + + final Response response = overlordResource.getDatasourceLockedIntervals(lockFilterPolicies); + Assert.assertEquals(200, response.getStatus()); + + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + Map> observedIntervals = jsonMapper.readValue( + jsonMapper.writeValueAsString(response.getEntity()), + new TypeReference>>() + { + } + ); + + Assert.assertEquals(expectedIntervals, observedIntervals); + } + + @Test + public void testGetLockedIntervalsWithEmptyBody() + { + replayAll(); + + Response response = overlordResource.getDatasourceLockedIntervals(null); + Assert.assertEquals(400, response.getStatus()); + + response = overlordResource.getDatasourceLockedIntervals(Collections.emptyList()); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testGetActiveLocks() throws Exception + { + final List lockFilterPolicies = ImmutableList.of( + new LockFilterPolicy("ds1", 25, null, null) + ); + final Map> expectedLocks = Collections.singletonMap( + "ds1", + Arrays.asList( + new ActiveTaskLockInfo( "TIME_CHUNK", "EXCLUSIVE", 25, Intervals.of("2012-01-01/2012-01-02") ), - new TaskLockInfo( + new ActiveTaskLockInfo( "TIME_CHUNK", "EXCLUSIVE", 75, @@ -1081,33 +1125,33 @@ public void testGetConflictingLockInfos() throws Exception ) ); - EasyMock.expect(taskLockbox.getConflictingLockInfos(lockFilterPolicies)) + EasyMock.expect(taskLockbox.getActiveLocks(lockFilterPolicies)) .andReturn(expectedLocks); replayAll(); - final Response response = overlordResource.getConflictingLockInfos(lockFilterPolicies); + final Response response = overlordResource.getActiveLocks(lockFilterPolicies); Assert.assertEquals(200, response.getStatus()); final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - Map> observedLocks = jsonMapper.readValue( + Map> observedLocks = jsonMapper.readValue( jsonMapper.writeValueAsString(response.getEntity()), new TypeReference() { } - ).getTaskLocks(); + ).getDatasourceToLocks(); Assert.assertEquals(expectedLocks, observedLocks); } @Test - public void testGetConflictingLockInfosWithEmptyBody() + public void testGetActiveLocksWithEmptyBody() { replayAll(); - Response response = overlordResource.getConflictingLockInfos(null); + Response response = overlordResource.getActiveLocks(null); Assert.assertEquals(400, response.getStatus()); - response = overlordResource.getConflictingLockInfos(Collections.emptyList()); + response = overlordResource.getActiveLocks(Collections.emptyList()); Assert.assertEquals(400, response.getStatus()); } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java index c2048a026ade..84f2dff1d79d 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITIndexerTest.java @@ -25,12 +25,12 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testsEx.categories.BatchIndex; import org.apache.druid.testsEx.config.DruidTestRunner; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -345,22 +345,21 @@ public void testGetLockedIntervals() throws Exception // Wait until it acquires a lock final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null)); - final Map> locks = new HashMap<>(); + final Map> lockedIntervals = new HashMap<>(); ITRetryUtil.retryUntilFalse( () -> { - locks.clear(); - locks.putAll(indexer.getConflictingLocks(lockFilterPolicies)); - return locks.isEmpty(); + lockedIntervals.clear(); + lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies)); + return lockedIntervals.isEmpty(); }, "Verify Intervals are Locked" ); // Verify the locked intervals for this datasource - Assert.assertEquals(locks.size(), 1); - Assert.assertEquals(locks.get(datasourceName).size(), 1); + Assert.assertEquals(lockedIntervals.size(), 1); Assert.assertEquals( - locks.get(datasourceName).get(0).getInterval(), - Intervals.of("2013-08-31/2013-09-02") + lockedIntervals.get(datasourceName), + Collections.singletonList(Intervals.of("2013-08-31/2013-09-02")) ); ITRetryUtil.retryUntilTrue( diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 2fe11297fd14..f75dc6043f9d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -40,14 +40,13 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; -import org.apache.druid.server.http.TaskLockResponse; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; import org.apache.druid.testing.utils.ITRetryUtil; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Interval; import java.net.URL; import java.util.ArrayList; @@ -336,13 +335,13 @@ public TaskReport.ReportMap getTaskReport(String taskId) } } - public Map> getConflictingLocks(List lockFilterPolicies) + public Map> getLockedIntervals(List lockFilterPolicies) { try { String jsonBody = jsonMapper.writeValueAsString(lockFilterPolicies); StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(getIndexerURL() + "conflictingLocks")) + new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals/v2")) .setContent( "application/json", StringUtils.toUtf8(jsonBody) @@ -351,10 +350,10 @@ public Map> getConflictingLocks(List() + new TypeReference>>() { } - ).getTaskLocks(); + ); } catch (Exception e) { throw new RuntimeException(e); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 07e567476820..79d63cb4550a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -19,13 +19,13 @@ package org.apache.druid.tests.coordinator.duty; +import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -55,12 +55,10 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; /** * Integration Test to verify behaviour when there is a lock contention between @@ -274,14 +272,9 @@ private void ensureLockedIntervals(Interval... intervals) () -> { lockedIntervals.clear(); - Map> locks = indexer.getConflictingLocks(Collections.singletonList(lockFilterPolicy)); - if (locks.containsKey(fullDatasourceName)) { - lockedIntervals.addAll( - locks.get(fullDatasourceName) - .stream() - .map(TaskLockInfo::getInterval) - .collect(Collectors.toList()) - ); + Map> allIntervals = indexer.getLockedIntervals(ImmutableList.of(lockFilterPolicy)); + if (allIntervals.containsKey(fullDatasourceName)) { + lockedIntervals.addAll(allIntervals.get(fullDatasourceName)); } LOG.info("Locked intervals: %s", lockedIntervals); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index df268eeda774..dfe308e2c1b1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -25,12 +25,12 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.tests.TestNGGroup; +import org.joda.time.Interval; import org.testng.Assert; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -344,22 +344,21 @@ public void testGetLockedIntervals() throws Exception // Wait until it acquires a lock final List lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null)); - final Map> locks = new HashMap<>(); + final Map> lockedIntervals = new HashMap<>(); ITRetryUtil.retryUntilFalse( () -> { - locks.clear(); - locks.putAll(indexer.getConflictingLocks(lockFilterPolicies)); - return locks.isEmpty(); + lockedIntervals.clear(); + lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies)); + return lockedIntervals.isEmpty(); }, "Verify Intervals are Locked" ); // Verify the locked intervals for this datasource - Assert.assertEquals(locks.size(), 1); - Assert.assertEquals(locks.get(datasourceName).size(), 1); + Assert.assertEquals(lockedIntervals.size(), 1); Assert.assertEquals( - locks.get(datasourceName).get(0).getInterval(), - Intervals.of("2013-08-31/2013-09-02") + lockedIntervals.get(datasourceName), + Collections.singletonList(Intervals.of("2013-08-31/2013-09-02")) ); ITRetryUtil.retryUntilTrue( diff --git a/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java b/server/src/main/java/org/apache/druid/metadata/ActiveTaskLockInfo.java similarity index 94% rename from server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java rename to server/src/main/java/org/apache/druid/metadata/ActiveTaskLockInfo.java index 444ef5fb6ee8..07fda10ff74a 100644 --- a/server/src/main/java/org/apache/druid/metadata/TaskLockInfo.java +++ b/server/src/main/java/org/apache/druid/metadata/ActiveTaskLockInfo.java @@ -28,7 +28,7 @@ /** * Contains information about an active task lock for a given datasource */ -public class TaskLockInfo +public class ActiveTaskLockInfo { private final String granularity; private final String type; @@ -36,7 +36,7 @@ public class TaskLockInfo private final Interval interval; @JsonCreator - public TaskLockInfo( + public ActiveTaskLockInfo( @JsonProperty("granularity") String granularity, @JsonProperty("type") String type, @JsonProperty("priority") int priority, @@ -82,7 +82,7 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - TaskLockInfo that = (TaskLockInfo) o; + ActiveTaskLockInfo that = (ActiveTaskLockInfo) o; return Objects.equals(granularity, that.granularity) && Objects.equals(type, that.type) && priority == that.priority @@ -98,7 +98,7 @@ public int hashCode() @Override public String toString() { - return "TaskLockInfo{" + + return "ActiveTaskLockInfo{" + "granularity=" + granularity + ", type=" + type + ", interval=" + interval + diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 925408664071..310684206d28 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -34,7 +34,6 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.ServiceRetryPolicy; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -181,13 +180,15 @@ ListenableFuture> taskStatuses( ListenableFuture> supervisorStatuses(); /** - * Returns a list of Locks of higher priority with conflicting intervals + * Returns a list of intervals locked by higher priority conflicting lock types * * @param lockFilterPolicies List of all filters for different datasources - * @return Map from datasource name to list of locks held by tasks that have conflicting intervals with + * @return Map from datasource name to list of intervals locked by tasks that have a conflicting lock type with * priority greater than or equal to the {@code minTaskPriority} for that datasource. */ - ListenableFuture>> findConflictingLockInfos(List lockFilterPolicies); + ListenableFuture>> findLockedIntervals( + List lockFilterPolicies + ); /** * Deletes pending segment records from the metadata store for a particular datasource. Records with diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index afb20631fc0e..3e3d86ca5f25 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -41,12 +41,10 @@ import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; import org.apache.druid.rpc.ServiceRetryPolicy; -import org.apache.druid.server.http.TaskLockResponse; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; @@ -192,36 +190,28 @@ public ListenableFuture taskStatus(final String taskId) } @Override - public ListenableFuture>> findConflictingLockInfos( + public ListenableFuture>> findLockedIntervals( List lockFilterPolicies ) { - final String path = "/druid/indexer/v1/conflictingLocks"; - - try { - return FutureUtils.transform( - client.asyncRequest( - new RequestBuilder(HttpMethod.POST, path) - .jsonContent(jsonMapper, lockFilterPolicies), - new BytesFullResponseHandler() - ), - holder -> { - final Map> response = JacksonUtils.readValue( - jsonMapper, - holder.getContent(), - new TypeReference() - { - } - ).getTaskLocks(); - - return response == null ? Collections.emptyMap() : response; - } - ); - } - catch (Exception e) { - // If there is an exception due to Overlord and Coordinator master version mismatch - return Futures.immediateFuture(Collections.emptyMap()); - } + final String path = "/druid/indexer/v1/lockedIntervals/v2"; + + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, path) + .jsonContent(jsonMapper, lockFilterPolicies), + new BytesFullResponseHandler() + ), + holder -> { + final Map> response = JacksonUtils.readValue( + jsonMapper, + holder.getContent(), + new TypeReference>>() {} + ); + + return response == null ? Collections.emptyMap() : response; + } + ); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index b9bfe986df71..d6e25fc9ac69 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -46,7 +46,6 @@ import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -68,10 +67,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -275,7 +272,7 @@ private boolean cancelTaskIfGranularityChanged( * higher priority Task * */ - private Map> getLockedIntervals( + private Map> getLockedIntervals( List compactionConfigs ) { @@ -284,17 +281,8 @@ private Map> getLockedIntervals( .map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null, config.getTaskContext())) .collect(Collectors.toList()); - final Map> datasourceToLocks = - FutureUtils.getUnchecked(overlordClient.findConflictingLockInfos(lockFilterPolicies), true); - final Map> datasourceToLockedIntervals = new HashMap<>(); - for (Map.Entry> locks : datasourceToLocks.entrySet()) { - final String datasource = locks.getKey(); - datasourceToLockedIntervals.put(datasource, new HashSet<>()); - for (TaskLockInfo lock : locks.getValue()) { - datasourceToLockedIntervals.get(datasource) - .add(lock.getInterval()); - } - } + final Map> datasourceToLockedIntervals = + new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies), true)); LOG.debug( "Skipping the following intervals for Compaction as they are currently locked: %s", datasourceToLockedIntervals diff --git a/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java b/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java index 97b4d1f8c06e..1e91a09e93ef 100644 --- a/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java +++ b/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java @@ -21,54 +21,34 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.metadata.TaskLockInfo; +import org.apache.druid.metadata.ActiveTaskLockInfo; import java.util.List; import java.util.Map; -import java.util.Objects; public class TaskLockResponse { - private final Map> taskLocks; + private final Map> datasourceToLocks; @JsonCreator public TaskLockResponse( - @JsonProperty("taskLocks") final Map> taskLocks + @JsonProperty("datasourceToLocks") final Map> datasourceToLocks ) { - this.taskLocks = taskLocks; + this.datasourceToLocks = datasourceToLocks; } @JsonProperty - public Map> getTaskLocks() + public Map> getDatasourceToLocks() { - return taskLocks; - } - - @Override - public boolean equals(final Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final TaskLockResponse that = (TaskLockResponse) o; - return Objects.equals(taskLocks, that.taskLocks); - } - - @Override - public int hashCode() - { - return Objects.hash(taskLocks); + return datasourceToLocks; } @Override public String toString() { return "TaskLockResponse{" + - "taskLocks='" + taskLocks + + "datasourceToLocks='" + datasourceToLocks + '}'; } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index 8832998239fe..10ebeb53af26 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -26,7 +26,6 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.joda.time.Interval; @@ -98,7 +97,7 @@ public ListenableFuture> supervisorStatuses( } @Override - public ListenableFuture>> findConflictingLockInfos( + public ListenableFuture>> findLockedIntervals( List lockFilterPolicies ) { diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 2d35399dfa45..5f5837462667 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -44,17 +44,16 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.MockServiceClient; import org.apache.druid.rpc.RequestBuilder; -import org.apache.druid.server.http.TaskLockResponse; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpVersion; +import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -221,56 +220,46 @@ public void test_taskStatuses_null_null_zero() throws Exception } @Test - public void test_findConflictingLockInfos() throws Exception + public void test_findLockedIntervals() throws Exception { - final Map> lockMap = - ImmutableMap.of( - "foo", - Collections.singletonList( - new TaskLockInfo( - "TIME_CHUNK", - "EXCLUSIVE", - 50, - Intervals.of("2000/2001") - ) - ) - ); + final Map> lockMap = + ImmutableMap.of("foo", Collections.singletonList(Intervals.of("2000/2001"))); final List requests = ImmutableList.of( new LockFilterPolicy("foo", 3, null, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLocks") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), - jsonMapper.writeValueAsBytes(new TaskLockResponse(lockMap)) + jsonMapper.writeValueAsBytes(lockMap) ); Assert.assertEquals( lockMap, - overlordClient.findConflictingLockInfos(requests).get() + overlordClient.findLockedIntervals(requests).get() ); } @Test - public void test_findConflictingLockInfos_nullReturn() throws Exception + public void test_findLockedIntervals_nullReturn() throws Exception { final List requests = ImmutableList.of( new LockFilterPolicy("foo", 3, null, null) ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/conflictingLocks") + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals/v2") .jsonContent(jsonMapper, requests), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), - jsonMapper.writeValueAsBytes(new TaskLockResponse(null)) + jsonMapper.writeValueAsBytes(null) ); Assert.assertEquals( Collections.emptyMap(), - overlordClient.findConflictingLockInfos(requests).get() + overlordClient.findLockedIntervals(requests).get() ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index f8c196de48f4..236cfaf7da54 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -61,7 +61,6 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; -import org.apache.druid.metadata.TaskLockInfo; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; @@ -1138,7 +1137,7 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() .thenReturn( Futures.immediateFuture( CloseableIterators.withEmptyBaggage(ImmutableList.of(runningConflictCompactionTask).iterator()))); - Mockito.when(mockClient.findConflictingLockInfos(ArgumentMatchers.any())) + Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.cancelTask(conflictTaskId)) .thenReturn(Futures.immediateFuture(null)); @@ -1230,35 +1229,20 @@ public void testRunWithLockedIntervals() // Lock all intervals for dataSource_1 and dataSource_2 final String datasource1 = DATA_SOURCE_PREFIX + 1; - overlordClient.conflictingLockInfos + overlordClient.lockedIntervals .computeIfAbsent(datasource1, k -> new ArrayList<>()) - .add(new TaskLockInfo( - "TIME_CHUNK", - "EXCLUSIVE", - 50, - Intervals.of("2017/2018") - )); + .add(Intervals.of("2017/2018")); final String datasource2 = DATA_SOURCE_PREFIX + 2; - overlordClient.conflictingLockInfos + overlordClient.lockedIntervals .computeIfAbsent(datasource2, k -> new ArrayList<>()) - .add(new TaskLockInfo( - "TIME_CHUNK", - "EXCLUSIVE", - 50, - Intervals.of("2017/2018") - )); + .add(Intervals.of("2017/2018")); // Lock all intervals but one for dataSource_0 final String datasource0 = DATA_SOURCE_PREFIX + 0; - overlordClient.conflictingLockInfos + overlordClient.lockedIntervals .computeIfAbsent(datasource0, k -> new ArrayList<>()) - .add(new TaskLockInfo( - "TIME_CHUNK", - "EXCLUSIVE", - 50, - Intervals.of("2017-01-01T13:00:00Z/2017-02-01") - )); + .add(Intervals.of("2017-01-01T13:00:00Z/2017-02-01")); // Verify that locked intervals are skipped and only one compaction task // is submitted for dataSource_0 @@ -2042,7 +2026,7 @@ private class TestOverlordClient extends NoopOverlordClient private final ObjectMapper jsonMapper; // Map from Task Id to the intervals locked by that task - private final Map> conflictingLockInfos = new HashMap<>(); + private final Map> lockedIntervals = new HashMap<>(); // List of submitted compaction tasks for verification in the tests private final List submittedCompactionTasks = new ArrayList<>(); @@ -2085,11 +2069,11 @@ public ListenableFuture runTask(String taskId, Object taskObject) @Override - public ListenableFuture>> findConflictingLockInfos( + public ListenableFuture>> findLockedIntervals( List lockFilterPolicies ) { - return Futures.immediateFuture(conflictingLockInfos); + return Futures.immediateFuture(lockedIntervals); } @Override @@ -2278,7 +2262,7 @@ private static ArgumentCaptor setUpMockClient(final OverlordClient mockC final ArgumentCaptor payloadCaptor = ArgumentCaptor.forClass(Object.class); Mockito.when(mockClient.taskStatuses(null, null, 0)) .thenReturn(Futures.immediateFuture(CloseableIterators.withEmptyBaggage(Collections.emptyIterator()))); - Mockito.when(mockClient.findConflictingLockInfos(ArgumentMatchers.any())) + Mockito.when(mockClient.findLockedIntervals(ArgumentMatchers.any())) .thenReturn(Futures.immediateFuture(Collections.emptyMap())); Mockito.when(mockClient.getTotalWorkerCapacity()) .thenReturn(Futures.immediateFuture(new IndexingTotalWorkerCapacityInfo(0, 0))); From 16aca2a79d1473273386ec34a86b8ba47dcc3bea Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 29 Jul 2024 17:43:32 +0530 Subject: [PATCH 4/6] Use TaskLock directly --- .../indexing/common/actions/TaskLocks.java | 11 -- .../druid/indexing/overlord/TaskLockbox.java | 8 +- .../indexing/overlord/TaskQueryTool.java | 4 +- .../overlord/http/OverlordResource.java | 1 - .../overlord}/http/TaskLockResponse.java | 10 +- .../indexing/overlord/TaskLockboxTest.java | 104 ++++++++--------- .../overlord/http/OverlordResourceTest.java | 35 +++--- .../druid/metadata/ActiveTaskLockInfo.java | 108 ------------------ 8 files changed, 76 insertions(+), 205 deletions(-) rename {server/src/main/java/org/apache/druid/server => indexing-service/src/main/java/org/apache/druid/indexing/overlord}/http/TaskLockResponse.java (83%) delete mode 100644 server/src/main/java/org/apache/druid/metadata/ActiveTaskLockInfo.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java index a875ff1d457a..aeecba0e289d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java @@ -31,7 +31,6 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -254,14 +253,4 @@ private static NavigableMap> getTaskLockMap(TaskLockbox .add(taskLock)); return taskLockMap; } - - public static ActiveTaskLockInfo toLockInfo(TaskLock taskLock) - { - return new ActiveTaskLockInfo( - taskLock.getGranularity().name(), - taskLock.getType().name(), - taskLock.getNonNullPriority(), - taskLock.getInterval() - ); - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 1165604147e6..bebb52157d6f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentAllocateRequest; import org.apache.druid.indexing.common.actions.SegmentAllocateResult; -import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; @@ -49,7 +48,6 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.query.QueryContexts; @@ -999,9 +997,9 @@ public Map> getLockedIntervals(List loc * @param lockFilterPolicies Lock filters for the given datasources * @return Map from datasource to list of non-revoked locks with at least as much priority and an overlapping interval */ - public Map> getActiveLocks(List lockFilterPolicies) + public Map> getActiveLocks(List lockFilterPolicies) { - final Map> datasourceToLocks = new HashMap<>(); + final Map> datasourceToLocks = new HashMap<>(); // Take a lock and populate the maps giant.lock(); @@ -1059,7 +1057,7 @@ public Map> getActiveLocks(List new ArrayList<>()) - .add(TaskLocks.toLockInfo(taskLockPosse.getTaskLock())); + .add(taskLockPosse.getTaskLock()); break; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index c0462887dabd..b25bde067c7f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -27,6 +27,7 @@ import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.http.TaskStateLookup; @@ -37,7 +38,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; @@ -98,7 +98,7 @@ public Map> getLockedIntervals(List loc * @param lockFilterPolicies Requests for active locks for various datasources * @return Map from datasource to conflicting lock infos */ - public Map> getActiveLocks(List lockFilterPolicies) + public Map> getActiveLocks(List lockFilterPolicies) { return taskLockbox.getActiveLocks(lockFilterPolicies); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 727e2d7e7d54..b62e1de055f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -64,7 +64,6 @@ import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.server.http.HttpMediaType; import org.apache.druid.server.http.ServletResourceUtils; -import org.apache.druid.server.http.TaskLockResponse; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.http.security.StateResourceFilter; diff --git a/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java similarity index 83% rename from server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java rename to indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java index 1e91a09e93ef..df4a5d8b03c8 100644 --- a/server/src/main/java/org/apache/druid/server/http/TaskLockResponse.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TaskLockResponse.java @@ -17,29 +17,29 @@ * under the License. */ -package org.apache.druid.server.http; +package org.apache.druid.indexing.overlord.http; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.metadata.ActiveTaskLockInfo; +import org.apache.druid.indexing.common.TaskLock; import java.util.List; import java.util.Map; public class TaskLockResponse { - private final Map> datasourceToLocks; + private final Map> datasourceToLocks; @JsonCreator public TaskLockResponse( - @JsonProperty("datasourceToLocks") final Map> datasourceToLocks + @JsonProperty("datasourceToLocks") final Map> datasourceToLocks ) { this.datasourceToLocks = datasourceToLocks; } @JsonProperty - public Map> getDatasourceToLocks() + public Map> getDatasourceToLocks() { return datasourceToLocks; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 77b473136631..b1491ee3ec30 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -37,7 +37,6 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.AbstractTask; @@ -52,7 +51,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.LockFilterPolicy; @@ -1284,27 +1282,24 @@ public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() @Test public void testGetActiveLocks() { - final Set expectedConflicts = new HashSet<>(); - final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - ); - expectedConflicts.add(overlappingReplaceLock); + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); + expectedLocks.add(overlappingReplaceLock); //Lower priority validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); - final ActiveTaskLockInfo overlappingAppendLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75) - ); - expectedConflicts.add(overlappingAppendLock); + final TaskLock overlappingAppendLock = + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + expectedLocks.add(overlappingAppendLock); // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - ); - expectedConflicts.add(overlappingExclusiveLock); + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( "none", @@ -1320,20 +1315,19 @@ public void testGetActiveLocks() null ); - Map> conflictingLocks = + Map> activeLocks = lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); - Assert.assertEquals(1, conflictingLocks.size()); - Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); } @Test public void testGetActiveLocksWithAppendLockIgnoresAppendLocks() { - final Set expectedConflicts = new HashSet<>(); - final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - ); - expectedConflicts.add(overlappingReplaceLock); + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); + expectedLocks.add(overlappingReplaceLock); //Lower priority validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); @@ -1343,10 +1337,9 @@ public void testGetActiveLocksWithAppendLockIgnoresAppendLocks() // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - ); - expectedConflicts.add(overlappingExclusiveLock); + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( "none", @@ -1362,20 +1355,19 @@ public void testGetActiveLocksWithAppendLockIgnoresAppendLocks() null ); - Map> conflictingLocks = + Map> activeLocks = lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); - Assert.assertEquals(1, conflictingLocks.size()); - Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); } @Test public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks() { - final Set expectedConflicts = new HashSet<>(); - final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - ); - expectedConflicts.add(overlappingReplaceLock); + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); + expectedLocks.add(overlappingReplaceLock); //Lower priority validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); @@ -1385,10 +1377,9 @@ public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks() // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - ); - expectedConflicts.add(overlappingExclusiveLock); + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( "none", @@ -1404,37 +1395,34 @@ public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks() null ); - Map> conflictingLocks = + Map> activeLocks = lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); - Assert.assertEquals(1, conflictingLocks.size()); - Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); } @Test public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks() { - final Set expectedConflicts = new HashSet<>(); - final ActiveTaskLockInfo overlappingReplaceLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50) - ); + final Set expectedLocks = new HashSet<>(); + final TaskLock overlappingReplaceLock = + validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50); - expectedConflicts.add(overlappingReplaceLock); + expectedLocks.add(overlappingReplaceLock); //Lower priority validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25); - final ActiveTaskLockInfo overlappingAppendLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75) - ); - expectedConflicts.add(overlappingAppendLock); + final TaskLock overlappingAppendLock = + validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); + expectedLocks.add(overlappingReplaceLock); // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); - final ActiveTaskLockInfo overlappingExclusiveLock = TaskLocks.toLockInfo( - validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50) - ); - expectedConflicts.add(overlappingExclusiveLock); + final TaskLock overlappingExclusiveLock = + validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50); + expectedLocks.add(overlappingExclusiveLock); LockFilterPolicy policy = new LockFilterPolicy( "none", @@ -1450,10 +1438,10 @@ public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks() null ); - Map> conflictingLocks = + Map> activeLocks = lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource)); - Assert.assertEquals(1, conflictingLocks.size()); - Assert.assertEquals(expectedConflicts, new HashSet<>(conflictingLocks.get("none"))); + Assert.assertEquals(1, activeLocks.size()); + Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none"))); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 27f8ea739ee3..93f80ac97096 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -36,6 +36,9 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.TimeChunkLock; import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; @@ -61,14 +64,12 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.UOE; -import org.apache.druid.metadata.ActiveTaskLockInfo; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.segment.TestHelper; -import org.apache.druid.server.http.TaskLockResponse; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; @@ -1107,21 +1108,25 @@ public void testGetActiveLocks() throws Exception final List lockFilterPolicies = ImmutableList.of( new LockFilterPolicy("ds1", 25, null, null) ); - final Map> expectedLocks = Collections.singletonMap( + final Map> expectedLocks = Collections.singletonMap( "ds1", Arrays.asList( - new ActiveTaskLockInfo( - "TIME_CHUNK", - "EXCLUSIVE", - 25, - Intervals.of("2012-01-01/2012-01-02") + new TimeChunkLock( + TaskLockType.REPLACE, + "groupId", + "datasource", + Intervals.of("2012-01-01/2012-01-02"), + "version", + 25 ), - new ActiveTaskLockInfo( - "TIME_CHUNK", - "EXCLUSIVE", - 75, - Intervals.of("2012-01-01/2012-01-02") - ) + new TimeChunkLock( + TaskLockType.EXCLUSIVE, + "groupId", + "datasource", + Intervals.of("2012-01-02/2012-01-03"), + "version", + 75 + ) ) ); @@ -1133,7 +1138,7 @@ public void testGetActiveLocks() throws Exception Assert.assertEquals(200, response.getStatus()); final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - Map> observedLocks = jsonMapper.readValue( + Map> observedLocks = jsonMapper.readValue( jsonMapper.writeValueAsString(response.getEntity()), new TypeReference() { diff --git a/server/src/main/java/org/apache/druid/metadata/ActiveTaskLockInfo.java b/server/src/main/java/org/apache/druid/metadata/ActiveTaskLockInfo.java deleted file mode 100644 index 07fda10ff74a..000000000000 --- a/server/src/main/java/org/apache/druid/metadata/ActiveTaskLockInfo.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.metadata; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.joda.time.Interval; - -import java.util.Objects; - -/** - * Contains information about an active task lock for a given datasource - */ -public class ActiveTaskLockInfo -{ - private final String granularity; - private final String type; - private final int priority; - private final Interval interval; - - @JsonCreator - public ActiveTaskLockInfo( - @JsonProperty("granularity") String granularity, - @JsonProperty("type") String type, - @JsonProperty("priority") int priority, - @JsonProperty("interval") Interval interval - ) - { - this.granularity = granularity; - this.type = type; - this.priority = priority; - this.interval = interval; - } - - @JsonProperty - public String getGranularity() - { - return granularity; - } - - @JsonProperty - public String getType() - { - return type; - } - - @JsonProperty - public int getPriority() - { - return priority; - } - - @JsonProperty - public Interval getInterval() - { - return interval; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ActiveTaskLockInfo that = (ActiveTaskLockInfo) o; - return Objects.equals(granularity, that.granularity) - && Objects.equals(type, that.type) - && priority == that.priority - && Objects.equals(interval, that.interval); - } - - @Override - public int hashCode() - { - return Objects.hash(granularity, type, priority, interval); - } - - @Override - public String toString() - { - return "ActiveTaskLockInfo{" + - "granularity=" + granularity + - ", type=" + type + - ", interval=" + interval + - ", priority=" + priority + - '}'; - } -} From de793fbdc9f97563a39b11bf95f93a7476a97c4d Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 29 Jul 2024 21:26:45 +0530 Subject: [PATCH 5/6] Fix tests and coverage --- .../indexing/overlord/TaskLockboxTest.java | 2 +- .../druid/metadata/LockFilterPolicy.java | 22 ------------------- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index b1491ee3ec30..a8c4b5117b1b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1415,7 +1415,7 @@ public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks() final TaskLock overlappingAppendLock = validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75); - expectedLocks.add(overlappingReplaceLock); + expectedLocks.add(overlappingAppendLock); // Non-overlapping interval validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75); diff --git a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java index 4cd0f548cd67..5864e28acc1d 100644 --- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java +++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java @@ -77,26 +77,4 @@ public List getIntervals() { return intervals; } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LockFilterPolicy that = (LockFilterPolicy) o; - return Objects.equals(datasource, that.datasource) - && priority == that.priority - && Objects.equals(intervals, that.intervals) - && Objects.equals(context, that.context); - } - - @Override - public int hashCode() - { - return Objects.hash(datasource, priority, intervals, context); - } } From 7a06f735724f742d93b51f8e430b256dc9b41629 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 30 Jul 2024 08:52:31 +0530 Subject: [PATCH 6/6] Unused import --- .../main/java/org/apache/druid/metadata/LockFilterPolicy.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java index 5864e28acc1d..019fd22807c3 100644 --- a/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java +++ b/server/src/main/java/org/apache/druid/metadata/LockFilterPolicy.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; /** * Specifies a policy to filter active locks held by a datasource