Skip to content

Commit

Permalink
Merge pull request #224 from maxfischer2781/feature/bulk_htcondor
Browse files Browse the repository at this point in the history
Bulk Executor and HTCondor Bulk Operations
  • Loading branch information
giffels authored Feb 9, 2022
2 parents eb1e91c + f8b9c70 commit 1c30c48
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 78 deletions.
8 changes: 8 additions & 0 deletions docs/source/adapters/site.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ Available adapter configuration options
| Option | Short Description | Requirement |
+================+===================================================================================+=================+
| max_age | The result of the `condor_status` call is cached for `max_age` in minutes. | **Required** |
+================+===================================================================================+=================+
| bulk_size | Maximum number of jobs to handle per bulk invocation of a condor tool. | **Optional** |
+ + + +
| | Default: 100 | |
+================+===================================================================================+=================+
| bulk_delay | Maximum duration in seconds to wait per bulk invocation of a condor tool. | **Optional** |
+ + + +
| | Default: 1.0 | |
+----------------+-----------------------------------------------------------------------------------+-----------------+
| executor | The |executor| used to run submission and further calls to the Moab batch system. | **Optional** |
+ + + +
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def get_cryptography_version():
"typer",
"bcrypt",
"python-multipart",
"typing_extensions",
"backports.cached_property",
],
extras_require={
"docs": [
Expand Down
188 changes: 154 additions & 34 deletions tardis/adapters/sites/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
from typing import Iterable, Tuple, Awaitable
from ...exceptions.executorexceptions import CommandExecutionFailure
from ...exceptions.tardisexceptions import TardisError
from ...exceptions.tardisexceptions import TardisResourceStatusUpdateFailed
from ...interfaces.siteadapter import SiteAdapter
from ...interfaces.siteadapter import ResourceStatus
from ...interfaces.executor import Executor
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.attributedict import AttributeDict
from ...utilities.staticmapping import StaticMapping
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.asyncbulkcall import AsyncBulkCall
from ...utilities.utils import csv_parser, machine_meta_data_translation

from contextlib import contextmanager
from datetime import datetime
from functools import partial
from string import Template

import warnings
import logging
import re

logger = logging.getLogger("cobald.runtime.tardis.adapters.sites.htcondor")


# TODO: Remove this once the old-style UUIDs are deprecated
def _job_id(resource_uuid: str) -> str:
"""
Normalize single "ClusterID" and bulk "ClusterID.ProcID" UUIDs to job IDs
"""
return resource_uuid if "." in resource_uuid else f"{resource_uuid}.0"


async def htcondor_queue_updater(executor):
attributes = dict(
Owner="Owner", JobStatus="JobStatus", ClusterId="ClusterId", ProcId="ProcId"
Expand All @@ -40,10 +52,122 @@ async def htcondor_queue_updater(executor):
delimiter="\t",
replacements=dict(undefined=None),
):
htcondor_queue[row["ClusterId"]] = row
row["JobId"] = f"{row['ClusterId']}.{row['ProcId']}"
htcondor_queue[row["JobId"]] = row
return htcondor_queue


JDL = str
# search the Job ID in a submit Proc line
SUBMIT_ID_PATTERN = re.compile(r"Proc\s(\d+\.\d+)")
# search for job queue commands
JDL_QUEUE_PATTERN = re.compile(r"^queue\s*\d*\s*$", flags=re.MULTILINE)


def _submit_description(resource_jdls: Tuple[JDL, ...]) -> str:
commands = []
for jdl in resource_jdls:
commands.append(jdl)
if JDL_QUEUE_PATTERN.search(jdl):
warnings.warn(
"Condor JDL templates may not include queue commands",
FutureWarning,
)
else:
commands.append("queue 1")
return "\n".join(commands)


async def condor_submit(*resource_jdls: JDL, executor: Executor) -> Iterable[str]:
"""Submit a number of resources from their JDL, reporting the new Job ID for each"""
# verbose submit gives an ordered listing of class ads, such as
# ** Proc 15556.0:
# Args = "150"
# ClusterId = 15556
# ...
# ProcId = 0
# QDate = 1641289701
# ...
#
# ** Proc 15556.1:
# ...
command = f"condor_submit -verbose -maxjobs {len(resource_jdls)}"
response = await executor.run_command(
command,
stdin_input=_submit_description(resource_jdls),
)
return (
SUBMIT_ID_PATTERN.search(line).group(1)
for line in response.stdout.splitlines()
if line.startswith("** Proc")
)


# condor_rm and condor_suspend are actually the same tool under the hood
# they only differ in the method called on the Schedd and their success message
def condor_rm(
*resource_attributes: AttributeDict, executor: Executor
) -> Awaitable[Iterable[bool]]:
"""Remove a number of resources, indicating success for each"""
return _condor_tool(
resource_attributes, executor, "condor_rm", "marked for removal"
)


def condor_suspend(
*resource_attributes: AttributeDict, executor: Executor
) -> Awaitable[Iterable[bool]]:
"""Suspend a number of resources, indicating success for each"""
return _condor_tool(resource_attributes, executor, "condor_suspend", "suspended")


# search the Job ID in a remove/suspend mark line
TOOL_ID_PATTERN = re.compile(r"Job\s(\d+\.\d+)")


async def _condor_tool(
resource_attributes: Tuple[AttributeDict, ...],
executor: Executor,
command: str,
success_message: str,
) -> Iterable[bool]:
"""
Generic call to modify a number of condor jobs and indicate success for each
The ``command`` and ``success_message`` should match the specific tool,
e.g. ``condor_rm`` reports ``Job XY.Z marked for removal`` and thus corresponds to
``_condor_tool(..., "condor_rm", "marked for removal")``.
"""
command = (
command
+ " "
+ " ".join(
_job_id(resource.remote_resource_uuid) for resource in resource_attributes
)
)
try:
response = await executor.run_command(command)
except CommandExecutionFailure as cef:
# the tool fails if none of the jobs are found – because they all just shut down
# report graceful failure for all
if cef.exit_code == 1 and "not found" in cef.stderr:
return [False] * len(resource_attributes)
raise
# successes are in stdout, failures in stderr, both in argument order
# stdout: Job 15540.0 marked for removal
# stderr: Job 15612.0 not found
# stderr: Job 15535.0 marked for removal
success_jobs = {
TOOL_ID_PATTERN.search(line).group(1)
for line in response.stdout.splitlines()
if line.endswith(success_message)
}
return (
_job_id(resource.remote_resource_uuid) in success_jobs
for resource in resource_attributes
)


# According to https://htcondor.readthedocs.io/en/latest/classad-attributes/
# job-classad-attributes.html
htcondor_status_codes = {
Expand All @@ -62,13 +186,27 @@ class HTCondorAdapter(SiteAdapter):
Cores=1, Memory=1024, Disk=1024 * 1024
)

def __init__(self, machine_type: str, site_name: str):
def __init__(
self,
machine_type: str,
site_name: str,
):
self._machine_type = machine_type
self._site_name = site_name
self._executor = getattr(self.configuration, "executor", ShellExecutor())
bulk_size = getattr(self.configuration, "bulk_size", 100)
bulk_delay = getattr(self.configuration, "bulk_delay", 1.0)
self._condor_submit, self._condor_suspend, self._condor_rm = (
AsyncBulkCall(
partial(tool, executor=self._executor),
size=bulk_size,
delay=bulk_delay,
)
for tool in (condor_submit, condor_suspend, condor_rm)
)

key_translator = StaticMapping(
remote_resource_uuid="ClusterId",
remote_resource_uuid="JobId",
resource_status="JobStatus",
created="created",
updated="updated",
Expand Down Expand Up @@ -116,13 +254,8 @@ async def deploy_resource(
),
)

response = await self._executor.run_command(
"condor_submit", stdin_input=submit_jdl
)
pattern = re.compile(
r"^.*?(?P<Jobs>\d+).*?(?P<ClusterId>\d+).$", flags=re.MULTILINE
)
response = AttributeDict(pattern.search(response.stdout).groupdict())
job_id = await self._condor_submit(submit_jdl)
response = AttributeDict(JobId=job_id)
response.update(self.create_timestamps())
return self.handle_response(response)

Expand All @@ -131,7 +264,7 @@ async def resource_status(
) -> AttributeDict:
await self._htcondor_queue.update_status()
try:
resource_uuid = resource_attributes.remote_resource_uuid
resource_uuid = _job_id(resource_attributes.remote_resource_uuid)
resource_status = self._htcondor_queue[resource_uuid]
except KeyError:
# In case the created timestamp is after last update timestamp of the
Expand All @@ -146,36 +279,23 @@ async def resource_status(
else:
return self.handle_response(resource_status)

async def _apply_condor_command(
self, resource_attributes: AttributeDict, condor_command: str
):
command = f"{condor_command} {resource_attributes.remote_resource_uuid}"
try:
response = await self._executor.run_command(command)
except CommandExecutionFailure as cef:
if cef.exit_code == 1 and "Couldn't find" in cef.stderr:
# Happens if condor_suspend/condor_rm is called in the moment
# the drone is shutting down itself. Repeat the procedure until
# resource has vanished from condor_q call
raise TardisResourceStatusUpdateFailed from cef
raise
pattern = re.compile(r"^.*?(?P<ClusterId>\d+).*$", flags=re.MULTILINE)
response = AttributeDict(pattern.search(response.stdout).groupdict())
return self.handle_response(response)

async def stop_resource(self, resource_attributes: AttributeDict):
"""
Stopping machines is equivalent to suspending jobs in HTCondor,
therefore condor_suspend is called!
"""
return await self._apply_condor_command(
resource_attributes, condor_command="condor_suspend"
)
resource_uuid = resource_attributes.remote_resource_uuid
if await self._condor_suspend(resource_attributes):
return self.handle_response(AttributeDict(JobId=resource_uuid))
logger.debug(f"condor_suspend failed for {resource_uuid}")
raise TardisResourceStatusUpdateFailed

async def terminate_resource(self, resource_attributes: AttributeDict):
return await self._apply_condor_command(
resource_attributes, condor_command="condor_rm"
)
resource_uuid = resource_attributes.remote_resource_uuid
if await self._condor_rm(resource_attributes):
return self.handle_response(AttributeDict(JobId=resource_uuid))
logger.debug(f"condor_rm failed for {resource_uuid}")
raise TardisResourceStatusUpdateFailed

@staticmethod
def create_timestamps():
Expand Down
15 changes: 14 additions & 1 deletion tardis/interfaces/executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
from typing import Optional
from typing_extensions import Protocol
from abc import ABCMeta, abstractmethod


class CommandResult(Protocol):
stdout: str
stderr: str
exitcode: int


class Executor(metaclass=ABCMeta):
@abstractmethod
async def run_command(self, command):
async def run_command(
self, command: str, stdin_input: Optional[str] = None
) -> CommandResult:
"""
Run ``command`` in a shell and provide the result
"""
return NotImplemented
2 changes: 1 addition & 1 deletion tardis/interfaces/siteadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def handle_response(
of the original response of the provider in keys of the common format.
:type key_translator: dict
:param translator_functions: A dictionary containing functions to
transform value of the original reponse of the provider into values of
transform value of the original response of the provider into values of
the common format.
:type translator_functions: dict
:param additional_content: Additional content to be put into response,
Expand Down
Loading

0 comments on commit 1c30c48

Please sign in to comment.