Skip to content

Commit

Permalink
Merge pull request #1790 from fractal-analytics-platform/1782-support…
Browse files Browse the repository at this point in the history
…-creationg-of-multiple-ssh-connections

Support creating multiple ssh connections
  • Loading branch information
tcompa committed Sep 24, 2024
2 parents ff2e608 + e05be73 commit a2421ce
Show file tree
Hide file tree
Showing 18 changed files with 588 additions and 84 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
**Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository.

# 2.5.2

* App:
* Replace `fractal_ssh` attribute with `fractal_ssh_list`, in `app.state` (\#1790).
* Move creation of SSH connections from app startup to endpoints (\#1790).
* Internal
* Introduce `FractalSSHList`, in view of support for multiple SSH/Slurm service users (\#1790).
* Make `FractalSSH.close()` more aggressively close `Transport` attribute (\#1790).
* Set `look_for_keys=False` for paramiko/fabric connection (\#1790).
* Testing:
* Add fixture to always test that threads do not accumulate during tests (\#1790).

# 2.5.1

Expand Down
14 changes: 13 additions & 1 deletion fractal_server/app/routes/api/v2/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ async def apply_workflow(
await db.merge(job)
await db.commit()

# User appropriate FractalSSH object
if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
ssh_credentials = dict(
user=settings.FRACTAL_SLURM_SSH_USER,
host=settings.FRACTAL_SLURM_SSH_HOST,
key_path=settings.FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH,
)
fractal_ssh_list = request.app.state.fractal_ssh_list
fractal_ssh = fractal_ssh_list.get(**ssh_credentials)
else:
fractal_ssh = None

background_tasks.add_task(
submit_workflow,
workflow_id=workflow.id,
Expand All @@ -246,7 +258,7 @@ async def apply_workflow(
worker_init=job.worker_init,
slurm_user=user.slurm_user,
user_cache_dir=user.cache_dir,
fractal_ssh=request.app.state.fractal_ssh,
fractal_ssh=fractal_ssh,
)
request.app.state.jobsV2.append(job.id)
logger.info(
Expand Down
11 changes: 10 additions & 1 deletion fractal_server/app/routes/api/v2/task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,20 @@ async def collect_tasks_pip(
db.add(state)
await db.commit()

# User appropriate FractalSSH object
ssh_credentials = dict(
user=settings.FRACTAL_SLURM_SSH_USER,
host=settings.FRACTAL_SLURM_SSH_HOST,
key_path=settings.FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH,
)
fractal_ssh_list = request.app.state.fractal_ssh_list
fractal_ssh = fractal_ssh_list.get(**ssh_credentials)

background_tasks.add_task(
background_collect_pip_ssh,
state.id,
task_pkg,
request.app.state.fractal_ssh,
fractal_ssh,
)

response.status_code = status.HTTP_201_CREATED
Expand Down
28 changes: 24 additions & 4 deletions fractal_server/app/runner/executors/slurm/ssh/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,34 @@ def __init__(
settings = Inject(get_settings)
self.python_remote = settings.FRACTAL_SLURM_WORKER_PYTHON
if self.python_remote is None:
self._stop_and_join_wait_thread()
raise ValueError("FRACTAL_SLURM_WORKER_PYTHON is not set. Exit.")

# Initialize connection and perform handshake
self.fractal_ssh = fractal_ssh
logger.warning(self.fractal_ssh)
self.handshake()
try:
self.handshake()
except Exception as e:
logger.warning(
"Stop/join waiting thread and then "
f"re-raise original error {str(e)}"
)
self._stop_and_join_wait_thread()
raise e

# Set/validate parameters for SLURM submission scripts
self.slurm_account = slurm_account
self.common_script_lines = common_script_lines or []
self._validate_common_script_lines()
try:
self._validate_common_script_lines()
except Exception as e:
logger.warning(
"Stop/join waiting thread and then "
f"re-raise original error {str(e)}"
)
self._stop_and_join_wait_thread()
raise e

# Set/initialize some more options
self.keep_pickle_files = keep_pickle_files
Expand Down Expand Up @@ -1385,6 +1402,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self.fractal_ssh.run_command(cmd=scancel_command)
logger.debug("Executor shutdown: end")

def _stop_and_join_wait_thread(self):
self.wait_thread.stop()
self.wait_thread.join()

def __exit__(self, *args, **kwargs):
"""
See
Expand All @@ -1393,8 +1414,7 @@ def __exit__(self, *args, **kwargs):
logger.debug(
"[FractalSlurmSSHExecutor.__exit__] Stop and join `wait_thread`"
)
self.wait_thread.stop()
self.wait_thread.join()
self._stop_and_join_wait_thread()
logger.debug("[FractalSlurmSSHExecutor.__exit__] End")

def run_squeue(self, job_ids):
Expand Down
8 changes: 6 additions & 2 deletions fractal_server/app/runner/executors/slurm/sudo/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def __init__(
for line in self.common_script_lines
if line.startswith("#SBATCH --account=")
)
self._stop_and_join_wait_thread()
raise RuntimeError(
"Invalid line in `FractalSlurmExecutor.common_script_lines`: "
f"'{invalid_line}'.\n"
Expand Down Expand Up @@ -1287,6 +1288,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):

logger.debug("Executor shutdown: end")

def _stop_and_join_wait_thread(self):
self.wait_thread.stop()
self.wait_thread.join()

def __exit__(self, *args, **kwargs):
"""
See
Expand All @@ -1295,6 +1300,5 @@ def __exit__(self, *args, **kwargs):
logger.debug(
"[FractalSlurmExecutor.__exit__] Stop and join `wait_thread`"
)
self.wait_thread.stop()
self.wait_thread.join()
self._stop_and_join_wait_thread()
logger.debug("[FractalSlurmExecutor.__exit__] End")
22 changes: 12 additions & 10 deletions fractal_server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,34 @@ async def lifespan(app: FastAPI):
settings = Inject(get_settings)

if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
from fractal_server.ssh._fabric import get_ssh_connection
from fractal_server.ssh._fabric import FractalSSH

connection = get_ssh_connection()
app.state.fractal_ssh = FractalSSH(connection=connection)
from fractal_server.ssh._fabric import FractalSSHList

app.state.fractal_ssh_list = FractalSSHList()

logger.info(
f"Created SSH connection "
f"({app.state.fractal_ssh.is_connected=})."
"Added empty FractalSSHList to app.state "
f"(id={id(app.state.fractal_ssh_list)})."
)
else:
app.state.fractal_ssh = None
app.state.fractal_ssh_list = None

config_uvicorn_loggers()
logger.info("End application startup")
reset_logger_handlers(logger)

yield

logger = get_logger("fractal_server.lifespan")
logger.info("Start application shutdown")

if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
logger.info(
f"Closing SSH connection "
f"(current: {app.state.fractal_ssh.is_connected=})."
"Close FractalSSH connections "
f"(current size: {app.state.fractal_ssh_list.size})."
)

app.state.fractal_ssh.close()
app.state.fractal_ssh_list.close_all()

logger.info(
f"Current worker with pid {os.getpid()} is shutting down. "
Expand Down
Loading

0 comments on commit a2421ce

Please sign in to comment.