Skip to content

Commit

Permalink
support customized template header (#376)
Browse files Browse the repository at this point in the history
Fix #213.

---------

Signed-off-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
njzjz and pre-commit-ci[bot] authored Oct 16, 2023
1 parent e1a29fc commit bf0ba1b
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 21 deletions.
17 changes: 15 additions & 2 deletions dpdispatcher/distributed_shell.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine
from dpdispatcher.utils import run_cmd_with_all_output
from dpdispatcher.utils import (
customized_script_header_template,
run_cmd_with_all_output,
)

shell_script_header_template = """
#!/bin/bash -l
Expand Down Expand Up @@ -112,7 +115,17 @@ def gen_script_end(self, job):
return script_end

def gen_script_header(self, job):
shell_script_header = shell_script_header_template
resources = job.resources
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
shell_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
shell_script_header = shell_script_header_template
return shell_script_header

def do_submit(self, job):
Expand Down
13 changes: 12 additions & 1 deletion dpdispatcher/dp_cloud_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dpdispatcher.dpcloudserver.config import ALI_OSS_BUCKET_URL
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine
from dpdispatcher.utils import customized_script_header_template

shell_script_header_template = """
#!/bin/bash -l
Expand Down Expand Up @@ -71,7 +72,17 @@ def gen_script(self, job):
return shell_script

def gen_script_header(self, job):
shell_script_header = shell_script_header_template
resources = job.resources
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
shell_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
shell_script_header = shell_script_header_template
return shell_script_header

def gen_local_script(self, job):
Expand Down
16 changes: 13 additions & 3 deletions dpdispatcher/fugaku.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine
from dpdispatcher.utils import customized_script_header_template

fugaku_script_header_template = """\
{queue_name_line}
Expand All @@ -28,9 +29,18 @@ def gen_script_header(self, job):
fugaku_script_header_dict[
"queue_name_line"
] = f'#PJM -L "rscgrp={resources.queue_name}"'
fugaku_script_header = fugaku_script_header_template.format(
**fugaku_script_header_dict
)
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
fugaku_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
fugaku_script_header = fugaku_script_header_template.format(
**fugaku_script_header_dict
)
return fugaku_script_header

def do_submit(self, job):
Expand Down
13 changes: 11 additions & 2 deletions dpdispatcher/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine
from dpdispatcher.utils import RetrySignal, retry
from dpdispatcher.utils import RetrySignal, customized_script_header_template, retry

lsf_script_header_template = """\
#!/bin/bash -l
Expand Down Expand Up @@ -60,7 +60,16 @@ def gen_script_header(self, job):
script_header_dict["lsf_number_gpu_line"] = ""
else:
script_header_dict["lsf_number_gpu_line"] = custom_gpu_line
lsf_script_header = lsf_script_header_template.format(**script_header_dict)
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
lsf_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
lsf_script_header = lsf_script_header_template.format(**script_header_dict)

return lsf_script_header

Expand Down
14 changes: 13 additions & 1 deletion dpdispatcher/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import shutil
import time

from dpdispatcher.utils import customized_script_header_template

try:
from bohriumsdk.client import Client
from bohriumsdk.job import Job
Expand Down Expand Up @@ -43,7 +45,17 @@ def gen_script(self, job):
return shell_script

def gen_script_header(self, job):
shell_script_header = shell_script_header_template
resources = job.resources
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
shell_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
shell_script_header = shell_script_header_template
return shell_script_header

def gen_local_script(self, job):
Expand Down
27 changes: 25 additions & 2 deletions dpdispatcher/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine
from dpdispatcher.utils import customized_script_header_template

pbs_script_header_template = """
#!/bin/bash -l
Expand All @@ -28,7 +29,18 @@ def gen_script_header(self, job):
"select_node_line"
] += f":ngpus={resources.gpu_per_node}"
pbs_script_header_dict["queue_name_line"] = f"#PBS -q {resources.queue_name}"
pbs_script_header = pbs_script_header_template.format(**pbs_script_header_dict)
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
pbs_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
pbs_script_header = pbs_script_header_template.format(
**pbs_script_header_dict
)
return pbs_script_header

def do_submit(self, job):
Expand Down Expand Up @@ -149,5 +161,16 @@ def gen_script_header(self, job):
gpu_per_node=resources.gpu_per_node
)
pbs_script_header_dict["queue_name_line"] = f"#PBS -q {resources.queue_name}"
pbs_script_header = pbs_script_header_template.format(**pbs_script_header_dict)
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
pbs_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
pbs_script_header = pbs_script_header_template.format(
**pbs_script_header_dict
)
return pbs_script_header
13 changes: 12 additions & 1 deletion dpdispatcher/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine
from dpdispatcher.utils import customized_script_header_template

shell_script_header_template = """
#!/bin/bash -l
Expand All @@ -15,7 +16,17 @@ def gen_script(self, job):
return shell_script

def gen_script_header(self, job):
shell_script_header = shell_script_header_template
resources = job.resources
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
shell_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
shell_script_header = shell_script_header_template
return shell_script_header

def do_submit(self, job):
Expand Down
15 changes: 13 additions & 2 deletions dpdispatcher/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine, script_command_template
from dpdispatcher.utils import RetrySignal, retry
from dpdispatcher.utils import RetrySignal, customized_script_header_template, retry

# from dpdispatcher.submission import Resources

Expand Down Expand Up @@ -48,7 +48,18 @@ def gen_script_header(self, job):
] = f"#SBATCH --partition {resources.queue_name}"
else:
script_header_dict["slurm_partition_line"] = ""
slurm_script_header = slurm_script_header_template.format(**script_header_dict)
if (
resources["strategy"].get("customized_script_header_template_file")
is not None
):
slurm_script_header = customized_script_header_template(
resources["strategy"]["customized_script_header_template_file"],
resources,
)
else:
slurm_script_header = slurm_script_header_template.format(
**script_header_dict
)
return slurm_script_header

@retry()
Expand Down
21 changes: 15 additions & 6 deletions dpdispatcher/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,9 @@ class Resources:
Usually, this option will be used with Task.task_need_resources variable simultaneously.
ratio_unfinished : float
The ratio of `task` that can be unfinished.
customized_script_header_template_file : str
The customized template file to generate job submitting script header,
which overrides the default file.
para_deg : int
Decide how many tasks will be run in parallel.
Usually run with `strategy['if_cuda_multi_devices']`
Expand Down Expand Up @@ -1007,12 +1010,8 @@ def __init__(
# if self.gpu_per_node > 1:
# self.in_para_task_num = 0

if "if_cuda_multi_devices" not in self.strategy:
self.strategy["if_cuda_multi_devices"] = default_strategy.get(
"if_cuda_multi_devices"
)
if "ratio_unfinished" not in self.strategy:
self.strategy["ratio_unfinished"] = default_strategy.get("ratio_unfinished")
for kk, value in default_strategy.items():
self.strategy.setdefault(kk, value)
if self.strategy["if_cuda_multi_devices"] is True:
if gpu_per_node < 1:
raise RuntimeError(
Expand Down Expand Up @@ -1121,6 +1120,10 @@ def arginfo(detail_kwargs=True):
"Usually, this option will be used with Task.task_need_resources variable simultaneously."
)
doc_ratio_unfinished = "The ratio of `tasks` that can be unfinished."
doc_customized_script_header_template_file = (
"The customized template file to generate job submitting script header, "
"which overrides the default file."
)

strategy_args = [
Argument(
Expand All @@ -1137,6 +1140,12 @@ def arginfo(detail_kwargs=True):
default=0.0,
doc=doc_ratio_unfinished,
),
Argument(
"customized_script_header_template_file",
str,
optional=True,
doc=doc_customized_script_header_template_file,
),
]
doc_strategy = "strategies we use to generation job submitting scripts."
strategy_format = Argument(
Expand Down
14 changes: 13 additions & 1 deletion dpdispatcher/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import base64
import hashlib
import hmac
import os
import struct
import subprocess
import time
from typing import Callable, Optional, Type, Union
from typing import TYPE_CHECKING, Callable, Optional, Type, Union

from dpdispatcher import dlog

if TYPE_CHECKING:
from dpdispatcher import Resources


def get_sha256(filename):
"""Get sha256 of a file.
Expand Down Expand Up @@ -193,3 +197,11 @@ def wrapper(*args, **kwargs):
return wrapper

return decorator


def customized_script_header_template(
filename: os.PathLike, resources: "Resources"
) -> str:
with open(filename) as f:
template = f.read()
return template.format(**resources.serialize())
Loading

0 comments on commit bf0ba1b

Please sign in to comment.