Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add singularity and slurm support #403

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
dba5d7d
Added code to support Singularity on HiperGator
suhasthegame Jan 3, 2024
75b8156
Testing for docker_entrypoint
suhasthegame Jan 9, 2024
2a9b195
Intermediate Commit --ignore
suhasthegame Feb 14, 2024
0705809
Added Dependencies --ignore
suhasthegame Feb 19, 2024
9957574
Ignore --all
suhasthegame Feb 21, 2024
5f2aac3
Added version number to girder worker
suhasthegame Feb 27, 2024
03b3717
Changed version
suhasthegame Feb 27, 2024
ba0c9c3
Reverted version back to original
suhasthegame Feb 28, 2024
10a4740
Intermediate Changes Please ignore
Apr 16, 2024
b1889c1
Temporary commit
Apr 30, 2024
b3429fe
Almost working
May 7, 2024
0ff3fec
Bug Fixes
May 14, 2024
27f790b
Addressed GPU config issue for plugins
May 16, 2024
a0f1b72
UF Progress
willdunklin May 21, 2024
9524112
Intermediate commit
Jun 24, 2024
31367c6
Fixed GPU and CPU allocation
Jul 1, 2024
3b08406
Added code to clean up tmp folders after job
Jul 10, 2024
8dadcc3
Merge branch 'migration' of https://github.com/suhasthegame/girder_wo…
willdunklin Jul 17, 2024
c7a9d3b
Split girder-worker-singularity into a seperate package
willdunklin Jul 22, 2024
8baadbc
Update singularity package dependencies
willdunklin Aug 1, 2024
66e99b3
Update singularity threads to track jobId
willdunklin Aug 21, 2024
6c326ee
Refactor singularity-slurm into separate package
willdunklin Aug 27, 2024
32c8e77
Format code
willdunklin Sep 16, 2024
2399313
Add slurm configuration settings
willdunklin Sep 16, 2024
d36a992
Merge branch 'master' of https://github.com/girder/girder_worker into…
willdunklin Sep 25, 2024
5daf316
Format code
willdunklin Sep 26, 2024
7d3783f
Undo test case removal
willdunklin Sep 26, 2024
971a918
Clean up dependencies
willdunklin Oct 2, 2024
bc129a9
Split slurm configuration setting out from worker
willdunklin Oct 2, 2024
2058ecc
Update slurm/singularity extensions to use scm versioning
willdunklin Oct 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Split girder-worker-singularity into a seperate package
  • Loading branch information
willdunklin committed Jul 22, 2024
commit c7a9d3b2d8e048be7efdfcda3997457b3385789f
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
/docs/_build/
/tmp/
__pycache__/
/girder_worker.egg-info/
*.egg-info/
/girder-worker-*.tar.gz
*.retry
.vagrant
39 changes: 18 additions & 21 deletions girder_worker/docker/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -390,9 +390,6 @@ def _cleanup_temp_volumes(self, temp_volumes, default_temp_volume):
shutil.rmtree(v.host_path)





def _docker_run(task, image, pull_image=True, entrypoint=None, container_args=None,
volumes=None, remove_container=True, stream_connectors=None, **kwargs):
volumes = volumes or {}
@@ -578,7 +575,7 @@ def singularity_run(task,**kwargs):
'volumes': volumes
}

# Allow run args to be overridden,filter out any we don't want to override
# Allow run args to be overridden,filter out any we don't want to override
extra_run_kwargs = {k: v for k, v in kwargs.items() if k not in BLACKLISTED_DOCKER_RUN_ARGS}
run_kwargs.update(extra_run_kwargs)

@@ -617,9 +614,9 @@ def singularity_exit_condition():
return results
#This function is used to check whether we need to switch to singularity or not.
def use_singularity():
'''
#This needs to be uncommented. Only for testing purposes.
'''
'''
#This needs to be uncommented. Only for testing purposes.
'''
# runtime = os.environ.get('RUNTIME')
# if runtime == 'SINGULARITY':
# return True
@@ -630,8 +627,8 @@ def use_singularity():
# with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
# return s.connect_ex('/var/run/docker.sock') != 0
# except socket.error:
return False
# return True
# return False
return True


@app.task
@@ -647,16 +644,16 @@ def _generate_slurm_script(container_args, kwargs):
raise Exception(' Issue with Slicer_Cli_Plugin_Image. Plugin Not available')
SIF_DIRECTORY = os.getenv('SIF_IMAGE_PATH')
image_full_path = os.path.join(SIF_DIRECTORY,image)
#Code to check for allocating multiple gpus.
#Code to check for allocating multiple gpus.
try:
gpu_index = container_args.index('--gpu')
gpus = int(container_args[gpu_index+1])
nvidia.set_nvidia_params(kwargs,singularity_command,gpus)
except ValueError as e:
if kwargs['nvidia']:
nvidia.set_nvidia_params(kwargs,singularity_command)
try:
pwd = kwargs['pwd']
try:
pwd = kwargs['pwd']
if not pwd:
raise Exception("PWD cannot be empty")
singularity_command.extend(['--pwd',pwd])
@@ -670,15 +667,15 @@ def _generate_slurm_script(container_args, kwargs):
def _monitor_singularity_job(task,slurm_run_command,slurm_config,log_file_name):
"""Create a drmaa session and monitor the job accordingly"""
decodestatus = {drmaa.JobState.UNDETERMINED: 'process status cannot be determined',
drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
drmaa.JobState.RUNNING: 'job is running',
drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
drmaa.JobState.DONE: 'job finished normally',
drmaa.JobState.FAILED: 'job finished, but failed'}
drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
drmaa.JobState.RUNNING: 'job is running',
drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
drmaa.JobState.DONE: 'job finished normally',
drmaa.JobState.FAILED: 'job finished, but failed'}
temp_directory = os.getenv('TMPDIR')
submit_dir = '/blue/pinaki.sarder/rc-svc-pinaki.sarder-web/submission'
def job_monitor():
Empty file.
10 changes: 10 additions & 0 deletions girder_worker/singularity/girder_worker_singularity/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from girder_worker import GirderWorkerPluginABC


class SingularityPlugin(GirderWorkerPluginABC):

def __init__(self, app, *args, **kwargs):
self.app = app

def task_imports(self):
return ['girder_worker_singularity.tasks']
156 changes: 156 additions & 0 deletions girder_worker/singularity/girder_worker_singularity/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import os
import threading

from girder_worker_utils import _walk_obj
# from slicer_cli_web.singularity.utils import switch_to_sif_image_folder

from girder_worker import logger
from girder_worker.app import Task, app
from girder_worker.docker import utils
from girder_worker.docker.io import FDReadStreamConnector, FDWriteStreamConnector, FDWriteStreamConnector, FDReadStreamConnector
from girder_worker.docker.tasks import _RequestDefaultTemporaryVolume, _handle_streaming_args
from girder_worker.docker.transforms import TemporaryVolume

BLACKLISTED_DOCKER_RUN_ARGS = ['tty', 'detach']


print('!!!!!!!!!!!!!!')

# Class for SingularityTask similar to DockerTask
class SingularityTask(Task):
def _maybe_transform_argument(self, arg):
return super()._maybe_transform_argument(
arg, task=self, _default_temp_volume=self.request._default_temp_volume)

def _maybe_transform_result(self, idx, result):
return super()._maybe_transform_result(
idx, result, _default_temp_volume=self.request._default_temp_volume)

def __call__(self, *args, **kwargs):
default_temp_volume = _RequestDefaultTemporaryVolume()
self.request._default_temp_volume = default_temp_volume

volumes = kwargs.setdefault('volumes', {})
# If we have a list of volumes, the user provide a list of Volume objects,
# we need to transform them.
temp_volumes = []
if isinstance(volumes, list):
# See if we have been passed any TemporaryVolume instances.
for v in volumes:
if isinstance(v, TemporaryVolume):
temp_volumes.append(v)

# First call the transform method, this we replace default temp volumes
# with the instance associated with this task create above. That is any
# reference to TemporaryVolume.default
_walk_obj(volumes, self._maybe_transform_argument)

# Now convert them to JSON
def _json(volume):
return volume._repr_json_()

volumes = _walk_obj(volumes, _json)
# We then need to merge them into a single dict and it will be ready
# for docker-py.
volumes = {k: v for volume in volumes for k, v in volume.items()}
kwargs['volumes'] = volumes

volumes.update(default_temp_volume._repr_json_())

try:
super().__call__(*args, **kwargs)
finally:
threading.Thread(
target=self._cleanup_temp_volumes,
args=(temp_volumes, default_temp_volume),
daemon=False).start()

def _cleanup_temp_volumes(self, temp_volumes, default_temp_volume):
# Set the permission to allow cleanup of temp directories
temp_volumes = [v for v in temp_volumes if os.path.exists(v.host_path)]
if default_temp_volume._transformed:
temp_volumes.append(default_temp_volume)

for v in temp_volumes:
utils.remove_tmp_folder_apptainer(v.host_path)


def singularity_run(task,**kwargs):
volumes = kwargs.pop('volumes',{})
container_args = kwargs.pop('container_args',[])
pull_image = kwargs['pull_image'] or False
stream_connectors = kwargs['stream_connectors'] or []
image = kwargs.get('image') or ''
entrypoint = None
if not image:
logger.exception(f"Image name cannot be emptu")
raise Exception(f"Image name cannot be empty")

run_kwargs = {
'tty': False,
'volumes': volumes
}

# Allow run args to be overridden,filter out any we don't want to override
extra_run_kwargs = {k: v for k, v in kwargs.items() if k not in BLACKLISTED_DOCKER_RUN_ARGS}
run_kwargs.update(extra_run_kwargs)

#Make entrypoint as pwd
if entrypoint is not None:
run_kwargs['entrypoint'] = entrypoint

log_file_name = kwargs['log_file']

container_args,read_streams,write_streams = _handle_streaming_args(container_args)
#MODIFIED FOR SINGULARITY (CHANGE CODE OF SINGULARITY CONTAINER)
for connector in stream_connectors:
if isinstance(connector, FDReadStreamConnector):
read_streams.append(connector)
elif isinstance(connector, FDWriteStreamConnector):
write_streams.append(connector)
else:
raise TypeError(
"Expected 'FDReadStreamConnector' or 'FDWriterStreamConnector', received '%s'"
% type(connector))

from uf import slurm_dispatch
slurm_dispatch(task, container_args, run_kwargs, read_streams, write_streams, log_file_name)
# slurm_run_command,slurm_config = _run_singularity_container(container_args,**run_kwargs)
# try:
# monitor_thread = _monitor_singularity_job(task,slurm_run_command,slurm_config,log_file_name)
# def singularity_exit_condition():
# return not monitor_thread.is_alive()
# utils.select_loop(exit_condition = singularity_exit_condition,
# readers= read_streams,
# writers = write_streams )
# finally:
# logger.info('DONE')

results = []
if hasattr(task.request,'girder_result_hooks'):
results = (None,) * len(task.request.girder_result_hooks)

return results

#This function is used to check whether we need to switch to singularity or not.
def use_singularity():
'''
#This needs to be uncommented. Only for testing purposes.
'''
# runtime = os.environ.get('RUNTIME')
# if runtime == 'SINGULARITY':
# return True
# if runtime == 'DOCKER':
# return False
# try:
# #Check whether we are connected to a docker socket.
# with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
# return s.connect_ex('/var/run/docker.sock') != 0
# except socket.error:
# return False
return True


@app.task
def container_backend(**kwargs):
return use_singularity()
Loading