-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsge-where
executable file
·237 lines (205 loc) · 8.62 KB
/
sge-where
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#! /usr/bin/env python
"""
List the hosts where a given job is running.
"""
import logging
import os
import subprocess
import sys
## auxiliary functions
try:
from collections import defaultdict
except ImportError:
class defaultdict(dict):
"""
A backport of `defaultdict` to Python 2.4
See http://docs.python.org/library/collections.html
>>> dl = defaultdict(list)
>>> dl['emptylist']
[]
>>> di = defaultdict(int)
>>> di['zero']
0
>>> ds = defaultdict(str)
>>> ds['empty']
''
"""
def __new__(cls, default_factory=None):
return dict.__new__(cls)
def __init__(self, default_factory):
self.default_factory = default_factory
def __missing__(self, key):
try:
return self.default_factory()
except:
raise KeyError("Key '%s' not in dictionary" % key)
def __getitem__(self, key):
if not dict.__contains__(self, key):
dict.__setitem__(self, key, self.__missing__(key))
return dict.__getitem__(self, key)
def capture(cmd):
logging.info('Running: "%s" ...' % cmd)
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = proc.communicate()[0]
return output
def parse_qhost_output(text):
"""
Parse output from `qhost -j` and return a dictionary mapping
job IDs into the list of SGE queues where each job is running.
.. TO-DO:: rewrite this function to parse `qhost`'s XML output
"""
jobs = defaultdict(list)
# omit the header lines
lines = text.split('\n')
for line in lines:
line = line.rstrip()
if (# skip header lines and inactive/unknown hosts
line == ''
# e.g.:
# -------------------------------------------------------------------------------
# global - - - - - - -
or line.endswith('-')
# e.g.:
# HOSTNAME ARCH NCPU LOAD MEMTOT MEMUSE SWAPTO SWAPUS
or line.startswith('HOSTNAME')
# e.g.:
# job-ID prior name user state submit/start at queue master ja-task-ID
or line.endswith('ja-task-ID')):
continue
elif line.startswith(' ') or line.startswith('\t'):
# continuation line
line = line.lstrip()
if line == '':
continue
try:
if line[0].isdigit():
jobid, prior, name, user, state, day_started, time_started, queue, _ = line.split(None, 8)
# queue instance name is truncated at 10 characters ...
if '@' in queue:
queuename, _ = queue.split('@', 1)
else:
queuename = queue
else:
# continuation line; still,
# queue instance name is truncated at 10 characters ...
if '@' in queue:
queuename, _ = queue.split('@', 1)
else:
queuename = queue
jobs[jobid].append( (hostname, queuename) )
except Exception:
logging.error("Cannot parse line '%s'" % line)
raise
else:
# HOSTNAME ARCH NCPU LOAD MEMTOT MEMUSE SWAPTO SWAPUS
# -------------------------------------------------------------------------------
# global - - - - - - -
# r01c01b01n01 lx24-amd64 8 1.37 23.6G 2.8G 4.0G 0.0
try:
hostname, arch, ncpu, load, memtot, memuse, swapto, swapus = line.strip().split()
except Exception:
logging.error("Cannot parse line '%s'" % line)
raise
return jobs
def summarize(hqs):
"""
Scan a list of pairs (host, queue) and return a dictionary
counting occurence of each distinct one.
Examples::
>>> counts = summarize([ ('n90', 'short.q'), ('n90', 'short.q'), ('n91', 'long.q') ])
>>> for k, v in counts.iteritems(): print k, v
...
('n91', 'long.q') 1
('n90', 'short.q') 2
"""
count = defaultdict(int)
for hq in hqs:
count[hq] += 1
return count
def print_job_location(jobid, hqs, brief):
"""
Print job location information.
First argument is the job ID, second argument `hqs` is a list of
*(host name, queue name)* pairs; third argument `brief` is a
boolean, controlling whether identical output lines should be
collapsed together and a count be added at the end.
"""
if brief:
for (hostname, queuename), count in summarize(hqs).items():
print ("%-12s %-16s %-10s %s" % (jobid, hostname, queuename, count))
else:
for (hostname, queuename)in hqs:
print ("%-12s %-16s %-10s" % (jobid, hostname, queuename))
## main
from optparse import OptionParser
def main():
parser = OptionParser(usage="Usage: %prog [options] {JOBID|USERNAME}",
version="%prog 1.0")
parser.add_option("-v", "--verbose",
action="store_true",
dest="verbose",
default=False,
help="Also print queue names.")
parser.add_option("-c", "--summarize",
action="store_true",
dest="summarize",
default=False,
help="Shorten list by collapsing equal entries.",)
parser.add_option("--selftest",
action="store_true",
dest="selftest",
default=False,
help="Run internal code tests and no other action.",)
(options, args) = parser.parse_args()
if options.selftest:
rc = run_selftest()
sys.exit(rc)
if len(args) != 1:
logging.critical(
"Need only one argument, either JOBID or USERNAME."
" Please run this command with option `--help` to see read usage help.")
sys.exit(1)
# determine whether the argument is a JOBID or a USERNAME
arg = args[0]
if arg[0].isdigit():
# arg is job id
qhost = capture('qhost -j')
jobs = parse_qhost_output(qhost)
hqs = jobs[arg]
print_job_location(arg, hqs, options.summarize)
else:
# arg is username
qhost = capture('qhost -u %s' % arg)
jobs = parse_qhost_output(qhost)
for jobid, hqs in jobs.items():
print_job_location(jobid, hqs, options.summarize)
if __name__ == '__main__':
main()
## test code
def run_selftest():
test_parse_qhost_output()
import doctest
doctest.testmod(name='sge_where',
optionflags=doctest.NORMALIZE_WHITESPACE)
return 0
def test_parse_qhost_output():
# extract from actual output on Schroedinger
jobs = parse_qhost_output("""
HOSTNAME ARCH NCPU LOAD MEMTOT MEMUSE SWAPTO SWAPUS
-------------------------------------------------------------------------------
global - - - - - - -
r01c01b01n01 lx24-amd64 8 8.00 23.6G 1.0G 4.0G 29.2M
r01c02b01n02 lx24-amd64 8 8.00 23.6G 5.9G 4.0G 10.5M
job-ID prior name user state submit/start at queue master ja-task-ID
----------------------------------------------------------------------------------------------
2526245 0.52470 tf-024 username r 01/31/2013 10:35:49 short.q@r0 SLAVE
short.q@r0 SLAVE
short.q@r0 SLAVE
short.q@r0 SLAVE
short.q@r0 SLAVE
short.q@r0 SLAVE
short.q@r0 SLAVE
short.q@r0 SLAVE
r01c02b02n01 lx24-amd64 8 8.31 23.6G 7.9G 4.0G 25.8M
""")
assert (jobs['2526245'] == ([('r01c02b01n02', 'short.q')] * 8))