-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcluster_status.py
127 lines (100 loc) · 4.17 KB
/
cluster_status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import argparse
import datetime
import os.path
import sys
import time
import cluster_commands
import try_command
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'job_id', help='the cluster id of the job to check the status of')
parser.add_argument(
'--retry-status-interval-seconds',
default='',
help='a "," separated list of integers representing'
' the number of seconds to wait after sequential failed'
' job status commands before retrying')
parser.add_argument(
'--resource-usage-dir',
help='a directory for storing the file paths where the resource usage'
' of each job should be logged')
parser.add_argument(
'--resource-usage-min-interval',
type=float,
default=120,
help='only log the resource usage if it has been at least this many'
' seconds since the last log')
args = parser.parse_args()
retry_status_interval_seconds = list()
for int_str in args.retry_status_interval_seconds.split(','):
retry_status_interval_seconds.append(int(int_str))
return {
'job_id': args.job_id,
'retry_status_interval_seconds': retry_status_interval_seconds,
'resource_usage_dir': args.resource_usage_dir,
'resource_usage_min_interval': args.resource_usage_min_interval,
}
def extract_job_info(stdout, job_id):
info, error = cluster_commands.try_extract_job_info_from_status_output(
stdout, job_id)
if error:
print('error: {}\n{}'.format(error, stdout), file=sys.stderr)
sys.exit(1)
now = datetime.datetime.now()
formatted_now = now.isoformat()
info['resource_usage'] = 'current_time: {}, {}'.format(
formatted_now, info['resource_usage'])
return info
def update_resource_log(status, resource_usage, resource_dir,
min_interval_seconds, job_id):
if not (resource_dir and os.path.isdir(resource_dir)):
return
resource_dir_job_file = os.path.join(resource_dir, '{}.txt'.format(job_id))
if not os.path.exists(resource_dir_job_file):
return
with open(resource_dir_job_file, 'rt') as f_handle:
resource_log_file = f_handle.read().strip()
is_final_update = status != 'running'
update_resource_log_with_file(resource_usage, min_interval_seconds,
resource_log_file, is_final_update)
if is_final_update:
os.remove(resource_dir_job_file)
def update_resource_log_with_file(resource_usage, min_interval_seconds,
resource_log_file, is_final_update):
if not resource_usage:
return
# resource_log_file is created and written to by this function.
# log_dir should have been created by snakemake when the job was submitted.
log_dir = os.path.dirname(resource_log_file)
if not (resource_log_file.endswith('.cluster.usage')
and os.path.isdir(log_dir)):
return
# The first write creates the file and does not check min_interval_seconds
if ((min_interval_seconds and os.path.exists(resource_log_file)
and not is_final_update)):
mod_time_seconds = os.stat(resource_log_file).st_mtime
current_seconds = time.time()
diff_seconds = current_seconds - mod_time_seconds
if diff_seconds < min_interval_seconds:
return
with open(resource_log_file, 'at') as f_handle:
f_handle.write('{}\n'.format(resource_usage))
def run_status_command(command, retry_status_interval_seconds):
stdout, error = try_command.try_command(command, retry_status_interval_seconds)
if error:
sys.exit(error)
return stdout
def main():
args = parse_args()
job_id = args['job_id']
command = cluster_commands.status_command(job_id)
stdout = run_status_command(command, args['retry_status_interval_seconds'])
job_info = extract_job_info(stdout, job_id)
status = job_info['status']
update_resource_log(status, job_info['resource_usage'],
args['resource_usage_dir'],
args['resource_usage_min_interval'], job_id)
print(status)
if __name__ == '__main__':
main()