Skip to content

Commit

Permalink
Implement volumes force detach (#2242)
Browse files Browse the repository at this point in the history
* Add comments on termination

* Simplify job termination and fix instance locking

* Fix tests

* Implement volume force detach

* Fix duration parameters parsing

* Filter our instances with detaching volumes

* Add docs on Force detach
  • Loading branch information
r4victor authored Jan 30, 2025
1 parent d47ec67 commit 2c3d83a
Show file tree
Hide file tree
Showing 25 changed files with 736 additions and 161 deletions.
14 changes: 13 additions & 1 deletion docs/docs/concepts/volumes.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ Volume my-volume does not exist yet. Create the volume? [y/n]: y

Once created, the volume can be attached to dev environments, tasks, and services.

> When creating a network volume, `dstack` automatically creates an `ext4` filesystem on it.
!!! info "Filesystem"
When creating a new network volume, `dstack` automatically creates an `ext4` filesystem on it.

### Attach a volume { #attach-network-volume }

Expand Down Expand Up @@ -137,6 +138,17 @@ and its contents will persist across runs.
to `/workflow` (and sets that as the current working directory). Right now, `dstack` doesn't allow you to
attach volumes to `/workflow` or any of its subdirectories.

### Detach a volume { #detach-network-volume }

`dstack` automatically detaches volumes from instances when a run stops.

!!! info "Force detach"
In some clouds such as AWS a volume may stuck in the detaching state.
To fix this, you can abort the run, and `dstack` will force detach the volume.
`dstack` will also force detach the stuck volume automatically after `stop_duration`.
Note that force detaching a volume is a last resort measure and may corrupt the file system.
Contact your cloud support if you're experience volumes stuck in the detaching state.

### Manage volumes { #manage-network-volumes }

#### List volumes
Expand Down
3 changes: 3 additions & 0 deletions src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ def apply_configuration(
backends=profile.backends,
regions=profile.regions,
instance_types=profile.instance_types,
reservation=profile.reservation,
spot_policy=profile.spot_policy,
retry_policy=profile.retry_policy,
max_duration=profile.max_duration,
stop_duration=profile.stop_duration,
max_price=profile.max_price,
working_dir=conf.working_dir,
run_name=conf.name,
Expand All @@ -110,6 +112,7 @@ def apply_configuration(
creation_policy=profile.creation_policy,
termination_policy=profile.termination_policy,
termination_policy_idle=profile.termination_idle_time,
idle_duration=profile.idle_duration,
)

print_run_plan(run_plan, offers_limit=configurator_args.max_offers)
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/cli/utils/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def print_run_plan(run_plan: RunPlan, offers_limit: int = 3):

profile = run_plan.run_spec.merged_profile
creation_policy = profile.creation_policy
# FIXME: This assumes the default idle_duration is the same for client and server.
# If the server changes idle_duration, old clients will see incorrect value.
termination_policy, termination_idle_time = get_termination(
profile, DEFAULT_RUN_TERMINATION_IDLE_TIME
)
Expand Down
42 changes: 36 additions & 6 deletions src/dstack/_internal/core/backends/aws/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
VolumeAttachmentData,
VolumeProvisioningData,
)
from dstack._internal.utils.common import get_or_error
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -630,17 +631,46 @@ def attach_volume(self, volume: Volume, instance_id: str) -> VolumeAttachmentDat
logger.debug("Attached EBS volume %s to instance %s", volume.volume_id, instance_id)
return VolumeAttachmentData(device_name=device_name)

def detach_volume(self, volume: Volume, instance_id: str):
def detach_volume(self, volume: Volume, instance_id: str, force: bool = False):
ec2_client = self.session.client("ec2", region_name=volume.configuration.region)

logger.debug("Detaching EBS volume %s from instance %s", volume.volume_id, instance_id)
ec2_client.detach_volume(
VolumeId=volume.volume_id,
InstanceId=instance_id,
Device=volume.attachment_data.device_name,
)
try:
ec2_client.detach_volume(
VolumeId=volume.volume_id,
InstanceId=instance_id,
Device=get_or_error(volume.attachment_data).device_name,
Force=force,
)
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "IncorrectState":
logger.info(
"Skipping EBS volume %s detach since it's already detached", volume.volume_id
)
return
raise e
logger.debug("Detached EBS volume %s from instance %s", volume.volume_id, instance_id)

def is_volume_detached(self, volume: Volume, instance_id: str) -> bool:
ec2_client = self.session.client("ec2", region_name=volume.configuration.region)

logger.debug("Getting EBS volume %s status", volume.volume_id)
response = ec2_client.describe_volumes(VolumeIds=[volume.volume_id])
volumes_infos = response.get("Volumes")
if len(volumes_infos) == 0:
logger.debug(
"Failed to check EBS volume %s status. Volume not found.", volume.volume_id
)
return True
volume_info = volumes_infos[0]
for attachment in volume_info["Attachments"]:
if attachment["InstanceId"] != instance_id:
continue
if attachment["State"] != "detached":
return False
return True
return True


def get_maximum_efa_interfaces(ec2_client: botocore.client.BaseClient, instance_type: str) -> int:
try:
Expand Down
11 changes: 10 additions & 1 deletion src/dstack/_internal/core/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,21 @@ def attach_volume(self, volume: Volume, instance_id: str) -> VolumeAttachmentDat
"""
raise NotImplementedError()

def detach_volume(self, volume: Volume, instance_id: str):
def detach_volume(self, volume: Volume, instance_id: str, force: bool = False):
"""
Detaches a volume from the instance.
"""
raise NotImplementedError()

def is_volume_detached(self, volume: Volume, instance_id: str) -> bool:
"""
Checks if a volume was detached from the instance.
If `detach_volume()` may fail to detach volume,
this method should be overridden to check the volume status.
The caller will trigger force detach if the volume gets stuck detaching.
"""
return True

def _get_offers_cached_key(self, requirements: Optional[Requirements] = None) -> int:
# Requirements is not hashable, so we use a hack to get arguments hash
if requirements is None:
Expand Down
2 changes: 1 addition & 1 deletion src/dstack/_internal/core/backends/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ def attach_volume(self, volume: Volume, instance_id: str) -> VolumeAttachmentDat
)
return VolumeAttachmentData(device_name=device_name)

def detach_volume(self, volume: Volume, instance_id: str):
def detach_volume(self, volume: Volume, instance_id: str, force: bool = False):
logger.debug(
"Detaching persistent disk for volume %s from instance %s",
volume.volume_id,
Expand Down
2 changes: 1 addition & 1 deletion src/dstack/_internal/core/backends/local/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,5 @@ def delete_volume(self, volume: Volume):
def attach_volume(self, volume: Volume, instance_id: str):
pass

def detach_volume(self, volume: Volume, instance_id: str):
def detach_volume(self, volume: Volume, instance_id: str, force: bool = False):
pass
2 changes: 1 addition & 1 deletion src/dstack/_internal/core/models/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class InstanceGroupParams(CoreModel):
Field(
description="Time to wait before terminating idle instances. Defaults to `5m` for runs and `3d` for fleets. Use `off` for unlimited duration"
),
]
] = None
# Deprecated:
termination_policy: Annotated[
Optional[TerminationPolicy],
Expand Down
52 changes: 43 additions & 9 deletions src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

DEFAULT_INSTANCE_RETRY_DURATION = 60 * 60 * 24 # 24h

DEFAULT_STOP_DURATION = 300


class SpotPolicy(str, Enum):
SPOT = "spot"
Expand All @@ -38,16 +40,27 @@ def parse_duration(v: Optional[Union[int, str]]) -> Optional[int]:
return Duration.parse(v)


def parse_max_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
# TODO: [Andrey] Not sure this works (see `parse_idle_duration`)
if v == "off":
return v
def parse_max_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
return parse_off_duration(v)


def parse_stop_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
return parse_off_duration(v)


def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
if v == "off" or v is False:
return "off"
if v is True:
return None
return parse_duration(v)


def parse_idle_duration(v: Optional[Union[int, str]]) -> Optional[Union[str, int]]:
def parse_idle_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
if v is False:
return -1
if v is True:
return None
return parse_duration(v)


Expand Down Expand Up @@ -136,9 +149,24 @@ class ProfileParams(CoreModel):
Field(description="The policy for resubmitting the run. Defaults to `false`"),
]
max_duration: Annotated[
Optional[Union[Literal["off"], str, int]],
Optional[Union[Literal["off"], str, int, bool]],
Field(
description="The maximum duration of a run (e.g., `2h`, `1d`, etc). After it elapses, the run is forced to stop. Defaults to `off`"
description=(
"The maximum duration of a run (e.g., `2h`, `1d`, etc)."
" After it elapses, the run is automatically stopped."
" Use `off` for unlimited duration. Defaults to `off`"
)
),
]
stop_duration: Annotated[
Optional[Union[Literal["off"], str, int, bool]],
Field(
description=(
"The maximum duration of a run gracefull stopping."
" After it elapses, the run is automatically forced stopped."
" This includes force detaching volumes used by the run."
" Use `off` for unlimited duration. Defaults to `5m`"
)
),
]
max_price: Annotated[
Expand All @@ -152,9 +180,12 @@ class ProfileParams(CoreModel):
),
]
idle_duration: Annotated[
Optional[Union[Literal["off"], str, int]],
Optional[Union[Literal["off"], str, int, bool]],
Field(
description="Time to wait before terminating idle instances. Defaults to `5m` for runs and `3d` for fleets. Use `off` for unlimited duration"
description=(
"Time to wait before terminating idle instances."
" Defaults to `5m` for runs and `3d` for fleets. Use `off` for unlimited duration"
)
),
]
# Deprecated:
Expand All @@ -180,6 +211,9 @@ class ProfileParams(CoreModel):
_validate_max_duration = validator("max_duration", pre=True, allow_reuse=True)(
parse_max_duration
)
_validate_stop_duration = validator("stop_duration", pre=True, allow_reuse=True)(
parse_stop_duration
)
_validate_termination_idle_time = validator(
"termination_idle_time", pre=True, allow_reuse=True
)(parse_duration)
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class JobSpec(CoreModel):
image_name: str
privileged: bool = False
max_duration: Optional[int]
stop_duration: Optional[int] = None
registry_auth: Optional[RegistryAuth]
requirements: Requirements
retry: Optional[Retry]
Expand Down
5 changes: 5 additions & 0 deletions src/dstack/_internal/server/background/tasks/process_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel):
JobStatus.TERMINATED,
JobStatus.ABORTED,
}:
# FIXME: This code does not expect JobStatus.TERMINATED status,
# so if a job transitions from RUNNING to TERMINATED,
# the run will transition to PENDING instead of TERMINATING.
# This may not be observed because process_runs is invoked more frequently
# than process_terminating_jobs and because most jobs usually transition to FAILED.
pass # unexpected, but let's ignore it
else:
raise ValueError(f"Unexpected job status {job_model.status}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
DEFAULT_POOL_NAME,
DEFAULT_RUN_TERMINATION_IDLE_TIME,
CreationPolicy,
Profile,
TerminationPolicy,
)
from dstack._internal.core.models.runs import (
Expand Down Expand Up @@ -52,6 +53,7 @@
)
from dstack._internal.server.services.jobs import (
find_job,
get_instances_ids_with_detaching_volumes,
)
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.logging import fmt
Expand Down Expand Up @@ -171,16 +173,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
await session.commit()
return

res = await session.execute(
select(PoolModel)
.where(
PoolModel.project_id == project.id,
PoolModel.name == (profile.pool_name or DEFAULT_POOL_NAME),
PoolModel.deleted == False,
)
.options(lazyload(PoolModel.instances))
)
pool = res.scalar_one()
pool = await _get_pool(session=session, project=project, profile=profile)

# Submitted jobs processing happens in two steps (transactions).
# First, the jobs gets an instance assigned (or no instance).
Expand All @@ -204,9 +197,13 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
# Start new transaction to see commited changes after lock
await session.commit()
async with get_locker().lock_ctx(InstanceModel.__tablename__, instances_ids):
# If another job freed the instance but is still trying to detach volumes,
# do not provision on it to prevent attaching volumes that are currently detaching.
detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session)
# Refetch after lock
res = await session.execute(
select(InstanceModel).where(
InstanceModel.id.not_in(detaching_instances_ids),
InstanceModel.id.in_(instances_ids),
InstanceModel.deleted == False,
InstanceModel.job_id.is_(None),
Expand Down Expand Up @@ -331,6 +328,19 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
await session.commit()


async def _get_pool(session: AsyncSession, project: ProjectModel, profile: Profile) -> PoolModel:
res = await session.execute(
select(PoolModel)
.where(
PoolModel.project_id == project.id,
PoolModel.name == (profile.pool_name or DEFAULT_POOL_NAME),
PoolModel.deleted == False,
)
.options(lazyload(PoolModel.instances))
)
return res.scalar_one()


async def _assign_job_to_pool_instance(
session: AsyncSession,
pool_instances: List[InstanceModel],
Expand Down
Loading

0 comments on commit 2c3d83a

Please sign in to comment.