Skip to content

Commit

Permalink
[hotfix][scheduler] Migrate the Time to Duration for SlotPool in the …
Browse files Browse the repository at this point in the history
…minimum scope
  • Loading branch information
RocMarshal authored and 1996fanrui committed Jul 31, 2024
1 parent 76049d0 commit 82b628d
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -282,7 +281,7 @@ public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection<AllocationID> preferredAllocations,
@Nullable Time timeout) {
@Nullable Duration timeout) {
assertRunningInMainThread();

log.debug(
Expand Down Expand Up @@ -318,20 +317,20 @@ public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
}

private CompletableFuture<PhysicalSlot> internalRequestNewSlot(
PendingRequest pendingRequest, @Nullable Time timeout) {
PendingRequest pendingRequest, @Nullable Duration timeout) {
internalRequestNewAllocatedSlot(pendingRequest);

if (timeout == null) {
return pendingRequest.getSlotFuture();
} else {
return FutureUtils.orTimeout(
pendingRequest.getSlotFuture(),
timeout.toMilliseconds(),
timeout.toMillis(),
TimeUnit.MILLISECONDS,
componentMainThreadExecutor,
String.format(
"Pending slot request %s timed out after %d ms.",
pendingRequest.getSlotRequestId(), timeout.toMilliseconds()))
pendingRequest.getSlotRequestId(), timeout.toMillis()))
.whenComplete(
(physicalSlot, throwable) -> {
if (throwable instanceof TimeoutException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
Expand All @@ -33,6 +32,7 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -154,7 +154,9 @@ Optional<PhysicalSlot> allocateAvailableSlot(
* @return a newly allocated slot that was previously not available.
*/
default CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
SlotRequestId slotRequestId, ResourceProfile resourceProfile, @Nullable Time timeout) {
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
@Nullable Duration timeout) {
return requestNewAllocatedSlot(
slotRequestId, resourceProfile, Collections.emptyList(), timeout);
}
Expand All @@ -175,7 +177,7 @@ CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
Collection<AllocationID> preferredAllocations,
@Nullable Time timeout);
@Nullable Duration timeout);

/**
* Requests the allocation of a new batch slot from the resource manager. Unlike the normal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nonnull Collection<AllocationID> preferredAllocations,
@Nullable Time timeout) {
@Nullable Duration timeout) {
return new CompletableFuture<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class DeclarativeSlotPoolBridgeBuilder {

private JobID jobId = new JobID();
private Duration batchSlotTimeout = JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue();
private Duration idleSlotTimeout = TestingUtils.infiniteTime().toDuration();
private Duration idleSlotTimeout = TestingUtils.infiniteDuration();
private Clock clock = SystemClock.getInstance();
private Duration slotRequestMaxInterval = SLOT_REQUEST_MAX_INTERVAL.defaultValue();
private ComponentMainThreadExecutor mainThreadExecutor = forMainThread();
Expand Down Expand Up @@ -101,7 +101,7 @@ public DeclarativeSlotPoolBridge build() {
jobId,
new DefaultDeclarativeSlotPoolFactory(),
clock,
TestingUtils.infiniteTime().toDuration(),
TestingUtils.infiniteDuration(),
idleSlotTimeout,
batchSlotTimeout,
requestSlotMatchingStrategy,
Expand All @@ -115,7 +115,7 @@ public DeclarativeSlotPoolBridge buildAndStart() throws Exception {
jobId,
new DefaultDeclarativeSlotPoolFactory(),
clock,
TestingUtils.infiniteTime().toDuration(),
TestingUtils.infiniteDuration(),
idleSlotTimeout,
batchSlotTimeout,
requestSlotMatchingStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ void testDeclarativeSlotPoolTakesPreferredAllocationsIntoAccount() throws Except
new JobID(),
new DefaultDeclarativeSlotPoolFactory(),
SystemClock.getInstance(),
TestingUtils.infiniteTime().toDuration(),
TestingUtils.infiniteTime().toDuration(),
TestingUtils.infiniteTime().toDuration(),
TestingUtils.infiniteDuration(),
TestingUtils.infiniteDuration(),
TestingUtils.infiniteDuration(),
PreferredAllocationRequestSlotMatchingStrategy.INSTANCE,
Duration.ZERO,
forMainThread());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
Expand All @@ -32,6 +31,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -46,7 +46,7 @@
/** Tests how the {@link DeclarativeSlotPoolBridge} completes slot requests. */
class DeclarativeSlotPoolBridgeRequestCompletionTest {

private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT;
private static final Duration TIMEOUT = SlotPoolUtils.TIMEOUT;

private TestingResourceManagerGateway resourceManagerGateway;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand Down Expand Up @@ -97,7 +96,7 @@ void testRequirementsIncreasedOnNewAllocation() throws Exception {

// requesting the allocation of a new slot should increase the requirements
declarativeSlotPoolBridge.requestNewAllocatedSlot(
new SlotRequestId(), ResourceProfile.UNKNOWN, Time.minutes(5));
new SlotRequestId(), ResourceProfile.UNKNOWN, Duration.ofMinutes(5));
assertThat(requirementListener.getRequirements().getResourceCount(ResourceProfile.UNKNOWN))
.isOne();
}
Expand All @@ -120,7 +119,7 @@ void testRequirementsDecreasedOnAllocationTimeout() throws Exception {
declarativeSlotPoolBridge.requestNewAllocatedSlot(
new SlotRequestId(),
ResourceProfile.UNKNOWN,
Time.milliseconds(5)),
Duration.ofMillis(5)),
mainThreadExecutor)
.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
Expand Down Expand Up @@ -95,7 +94,7 @@ void testNotEnoughResourcesAvailableFailsPendingRequests() throws Exception {
declarativeSlotPoolBridge.requestNewAllocatedSlot(
slotRequestId,
ResourceProfile.UNKNOWN,
Time.minutes(5)),
Duration.ofMinutes(5)),
componentMainThreadExecutor)
.get();

Expand Down Expand Up @@ -167,7 +166,7 @@ void testNoConcurrentModificationWhenSuspendingAndReleasingSlot() throws Excepti
declarativeSlotPoolBridge.requestNewAllocatedSlot(
slotRequestId,
ResourceProfile.UNKNOWN,
Time.fromDuration(RPC_TIMEOUT));
RPC_TIMEOUT);
slotFuture.whenComplete(
(physicalSlot, throwable) -> {
if (throwable != null) {
Expand Down Expand Up @@ -199,9 +198,7 @@ void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Exception

final CompletableFuture<PhysicalSlot> slotFuture =
declarativeSlotPoolBridge.requestNewAllocatedSlot(
new SlotRequestId(),
ResourceProfile.UNKNOWN,
Time.fromDuration(RPC_TIMEOUT));
new SlotRequestId(), ResourceProfile.UNKNOWN, RPC_TIMEOUT);

final LocalTaskManagerLocation localTaskManagerLocation =
new LocalTaskManagerLocation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
Expand All @@ -27,6 +26,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
Expand All @@ -36,7 +36,7 @@
/** Tests for the {@link DeclarativeSlotPoolBridge} interactions. */
class SlotPoolInteractionsTest {

private static final Time fastTimeout = Time.milliseconds(1L);
private static final Duration fastTimeout = Duration.ofMillis(1L);

@RegisterExtension
private static final TestingComponentMainThreadExecutor.Extension EXECUTOR_EXTENSION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
Expand All @@ -33,6 +32,7 @@
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -48,7 +48,7 @@
/** Testing utility functions for the {@link SlotPool}. */
public class SlotPoolUtils {

public static final Time TIMEOUT = Time.seconds(10L);
public static final Duration TIMEOUT = Duration.ofSeconds(10L);

private SlotPoolUtils() {
throw new UnsupportedOperationException("Cannot instantiate this class.");
Expand Down

0 comments on commit 82b628d

Please sign in to comment.