-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlaunch.py
122 lines (97 loc) · 4 KB
/
launch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import hydra
from omegaconf import DictConfig
import base64
from kubernetes import client, config
from kubejobs.jobs import KubernetesJob, KueueQueue
from consistency.config import EvalConfig
def check_if_completed(job_name: str, namespace: str = "informatics") -> bool:
# Load the kube config
config.load_kube_config()
# Create an instance of the API class
api = client.BatchV1Api()
job_exists = False
is_completed = True
# Check if the job exists in the specified namespace
jobs = api.list_namespaced_job(namespace)
if job_name in {job.metadata.name for job in jobs.items}:
job_exists = True
if job_exists is True:
job = api.read_namespaced_job(job_name, namespace)
is_completed = False
# Check the status conditions
if job.status.conditions:
for condition in job.status.conditions:
if condition.type == "Complete" and condition.status == "True":
is_completed = True
elif condition.type == "Failed" and condition.status == "True":
print(f"Job {job_name} has failed.")
else:
print(f"Job {job_name} still running or status is unknown.")
if is_completed:
api_res = api.delete_namespaced_job(
name=job_name,
namespace=namespace,
body=client.V1DeleteOptions(propagation_policy="Foreground"),
)
print(f"Job '{job_name}' deleted. Status: {api_res.status}")
return is_completed
def send_message_command(cfg: EvalConfig):
# webhook - load from env
config.load_kube_config()
v1 = client.CoreV1Api()
secret_name = cfg.launch.env_vars["SLACK_WEBHOOK"]["secret_name"]
secret_key = cfg.launch.env_vars["SLACK_WEBHOOK"]["key"]
secret = v1.read_namespaced_secret(secret_name, "informatics").data
webhook = base64.b64decode(secret[secret_key]).decode("utf-8")
return (
"""curl -X POST -H 'Content-type: application/json' --data '{"text":"Job started in '"$POD_NAME"'"}' """
+ webhook
+ " ; "
)
def export_env_vars(cfg: EvalConfig):
cmd = ""
for key in cfg.launch.env_vars.keys():
cmd += f" export {key}=${key} &&"
cmd = cmd.strip(" &&") + " ; "
return cmd
@hydra.main(config_path="configs", config_name="base", version_base=None)
def main(cfg: DictConfig):
cfg = EvalConfig(**dict(cfg))
job_name = cfg.launch.job_name
is_completed = check_if_completed(job_name, namespace=cfg.launch.namespace)
if is_completed is True:
print(f"Job '{job_name}' is completed. Launching a new job.")
consistency_cfg = dict(cfg)["consistency"]
command = cfg.launch.command
for key, value in consistency_cfg.items():
command += f" ++consistency.{key}={value}"
print(f"Command: {command}")
# Create a Kubernetes Job with a name, container image, and command
print(f"Creating job for: {command}")
job = KubernetesJob(
name=job_name,
cpu_request=cfg.launch.cpu_request,
ram_request=cfg.launch.ram_request,
image="docker.io/gautierdag/consistency:latest",
gpu_type="nvidia.com/gpu",
gpu_limit=cfg.launch.gpu_limit,
gpu_product=cfg.launch.gpu_product,
backoff_limit=0,
command=["/bin/bash", "-c", "--"],
args=[export_env_vars(cfg) + send_message_command(cfg) + command],
user_email="gautier.dagan@ed.ac.uk",
namespace=cfg.launch.namespace,
kueue_queue_name=KueueQueue.INFORMATICS,
secret_env_vars=cfg.launch.env_vars,
volume_mounts={
"nfs": {"mountPath": "/nfs", "server": "10.24.1.255", "path": "/"}
},
)
job_yaml = job.generate_yaml()
print(job_yaml)
# Run the Job on the Kubernetes cluster
job.run()
else:
print(f"Job '{job_name}' is still running.")
if __name__ == "__main__":
main()