-
Notifications
You must be signed in to change notification settings - Fork 1
/
confirm_jobIDs.py
executable file
·373 lines (291 loc) · 11.3 KB
/
confirm_jobIDs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#!/usr/bin/env python3
# ===============================================================================
#
# confirm_jobIDs.py - Confirms throughput of jobIDs in all stages of processing.
#
# ARGS:
# 1st: the BillingConfig spreadsheet.
#
# SWITCHES:
# --accounting_file: Location of accounting file (overrides BillingConfig.xlsx)
# --billing_details_file: Location of the BillingDetails.xlsx file (default=look in BillingRoot/<year>/<month>)
# --billing_root: Location of BillingRoot directory (overrides BillingConfig.xlsx)
# [default if no BillingRoot in BillingConfig.xlsx or switch given: CWD]
# --year: Year of snapshot requested. [Default is this year]
# --month: Month of snapshot requested. [Default is last month]
#
# OUTPUT:
# Text to stdout regarding the flow of jobIDs from accounting file
# to BillingDetails file to BillingNotifs files.
#
# ASSUMPTIONS:
# BillingNotifs files corresponding to all the PI tags within the BillingConfig
# file are in the BillingRoot directory.
#
# AUTHOR:
# Keith Bettinger
#
#==============================================================================
#=====
#
# IMPORTS
#
#=====
import argparse
from collections import defaultdict
import datetime
import os
import os.path
import sys
import xlrd
from functools import reduce
# Simulate an "include billing_common.py".
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
exec(compile(open(os.path.join(SCRIPT_DIR, "billing_common.py"), "rb").read(), os.path.join(SCRIPT_DIR, "billing_common.py"), 'exec'))
from job_accounting_file import JobAccountingFile
#=====
#
# GLOBALS
#
#=====
# In billing_common.py
global SLURMACCOUNTING_PREFIX
global BILLING_DETAILS_PREFIX
global BILLING_NOTIFS_PREFIX
#=====
#
# CONSTANTS
#
#=====
#=====
#
# FUNCTIONS
#
#=====
# from billing_common.py
global read_config_sheet
global from_excel_date_to_timestamp
global from_excel_date_to_date_string
global from_ymd_date_to_timestamp
global sheet_get_named_column
def get_pi_tag_list(billing_config_wkbk):
# Get PI tag list from BillingConfig workbook.
pis_sheet = billing_config_wkbk.sheet_by_name("PIs")
pi_tag_list = sheet_get_named_column(pis_sheet, "PI Tag")
#
# Filter pi_tag_list for PIs active in the current month.
#
pi_dates_added = sheet_get_named_column(pis_sheet, "Date Added")
pi_dates_removed = sheet_get_named_column(pis_sheet, "Date Removed")
pi_tags_and_dates_added = list(zip(pi_tag_list, pi_dates_added, pi_dates_removed))
for (pi_tag, date_added, date_removed) in pi_tags_and_dates_added:
# Convert the Excel dates to timestamps.
date_added_timestamp = from_excel_date_to_timestamp(date_added)
if date_removed != '':
date_removed_timestamp = from_excel_date_to_timestamp(date_removed)
else:
date_removed_timestamp = end_month_timestamp + 1 # Not in this month.
# If the date added is AFTER the end of this month, or
# the date removed is BEFORE the beginning of this month,
# then remove the pi_tag from the list.
if date_added_timestamp >= end_month_timestamp:
print(" *** Ignoring PI %s: added after this month on %s" % (pi_tag, from_excel_date_to_date_string(date_added)), file=sys.stderr)
pi_tag_list.remove(pi_tag)
elif date_removed_timestamp < begin_month_timestamp:
print(" *** Ignoring PI %s: removed before this month on %s" % (pi_tag, from_excel_date_to_date_string(date_removed)), file=sys.stderr)
pi_tag_list.remove(pi_tag)
return pi_tag_list
def read_jobIDs(wkbk, sheet_name):
sheet = wkbk.sheet_by_name(sheet_name)
jobIDs = sheet_get_named_column(sheet, "Job ID")
return [int(x) for x in jobIDs]
def print_set(my_set, max_elts=10000):
elt_count = 0
for elt in my_set:
if elt_count % 10 == 0:
elt_count += 1
print()
if elt_count >= max_elts:
break
print("%s" % elt, end=' ')
else:
print()
#=====
#
# SCRIPT BODY
#
#=====
parser = argparse.ArgumentParser()
parser.add_argument("billing_config_file",
help='The BillingConfig file')
parser.add_argument("-a", "--accounting_file",
default=None,
help='The SGE accounting file to read [default = None]')
parser.add_argument("-d","--billing_details_file",
default=None,
help='The BillingDetails file')
parser.add_argument("-r", "--billing_root",
default=None,
help='The Billing Root directory [default = None]')
parser.add_argument("-y","--year", type=int, choices=list(range(2013,2031)),
default=None,
help="The year to be filtered out. [default = this year]")
parser.add_argument("-m", "--month", type=int, choices=list(range(1,13)),
default=None,
help="The month to be filtered out. [default = last month]")
parser.add_argument("-v", "--verbose", action="store_true",
default=False,
help='Get chatty [default = false]')
args = parser.parse_args()
#
# Process arguments.
#
# Do year first, because month might modify it.
if args.year is None:
year = datetime.date.today().year
else:
year = args.year
# Do month now, and decrement year if want last month and this month is Dec.
if args.month is None:
# No month given: use last month.
this_month = datetime.date.today().month
# If this month is Jan, last month was Dec. of previous year.
if this_month == 1:
month = 12
year -= 1
else:
month = this_month - 1
else:
month = args.month
# Calculate next month for range of this month.
if month != 12:
next_month = month + 1
next_month_year = year
else:
next_month = 1
next_month_year = year + 1
# The begin_ and end_month_timestamps are to be used as follows:
# date is within the month if begin_month_timestamp <= date < end_month_timestamp
# Both values should be UTC.
begin_month_timestamp = from_ymd_date_to_timestamp(year, month, 1)
end_month_timestamp = from_ymd_date_to_timestamp(next_month_year, next_month, 1)
#
# Open the Billing Config workbook.
#
billing_config_wkbk = xlrd.open_workbook(args.billing_config_file)
#
# Get the location of the BillingRoot directory from the Config sheet.
# (Ignore the accounting file from this sheet).
#
(billing_root, _) = read_config_sheet(billing_config_wkbk)
# Override billing_root with switch args, if present.
if args.billing_root is not None:
billing_root = args.billing_root
# If we still don't have a billing root dir, use the current directory.
if billing_root is None:
billing_root = os.getcwd()
# Within BillingRoot, create YEAR/MONTH dirs if necessary.
year_month_dir = os.path.join(billing_root, str(year), "%02d" % month)
# Use switch arg for accounting_file if present, else use file in BillingRoot.
if args.accounting_file is not None:
accounting_file = args.accounting_file
else:
accounting_filename = "%s.%d-%02d.txt" % (SLURMACCOUNTING_PREFIX, year, month)
accounting_file = os.path.join(year_month_dir, accounting_filename)
# If BillingDetails file given, use that, else look in BillingRoot.
if args.billing_details_file is not None:
billing_details_file = args.billing_details_file
else:
billing_details_file = os.path.join(year_month_dir, "%s.%d-%02d.xlsx" % (BILLING_DETAILS_PREFIX, year, month))
#
# Read all JobIDs from accounting file.
#
accounting_jobIDs = []
accounting_file_fp = JobAccountingFile(accounting_file)
for accounting_record in accounting_file_fp:
accounting_jobIDs.append(accounting_record.job_id)
#
# Read all JobIDs from BillingDetails file.
# Sheets:
# Computing (Billable Jobs)
# Nonbillable Jobs
# Failed Jobs
#
billing_details_wkbk = xlrd.open_workbook(billing_details_file)
details_billable_jobIDs = read_jobIDs(billing_details_wkbk, 'Computing')
computing_extra_page = 2
while (True):
more_computing_details_sheet_name = "Computing %d" % computing_extra_page
try:
_ = billing_details_wkbk.sheet_by_name(more_computing_details_sheet_name)
except xlrd.biffh.XLRDError:
break # No more computing sheets: exit the while True loop.
more_details_billable_jobIDs = read_jobIDs(billing_details_wkbk, more_computing_details_sheet_name)
details_billable_jobIDs.extend(more_details_billable_jobIDs)
computing_extra_page += 1
details_nonbillable_jobIDs = read_jobIDs(billing_details_wkbk, 'Nonbillable Jobs')
details_failed_jobIDs = read_jobIDs(billing_details_wkbk, 'Failed Jobs')
#
# For each PI tag, read the BillingNotifs file.
# Sheet: Computing Details
#
# Get list of active PI tags.
pi_tag_list = get_pi_tag_list(billing_config_wkbk)
# Make mapping from PI tag to list of jobIDs.
pi_tag_jobIDs_dict = defaultdict(list)
# Loop over PI tag list to get jobIDs
for pi_tag in pi_tag_list:
notifs_wkbk_filename = "%s-%s.%s-%02d.xlsx" % (BILLING_NOTIFS_PREFIX, pi_tag, year, month)
notifs_wkbk_pathname = os.path.join(year_month_dir, notifs_wkbk_filename)
billing_notifs_wkbk = xlrd.open_workbook(notifs_wkbk_pathname)
pi_tag_jobIDs = read_jobIDs(billing_notifs_wkbk, "Computing Details")
pi_tag_jobIDs_dict[pi_tag].extend(pi_tag_jobIDs)
#
# Analyze the JobID sources.
#
# Unique the accounting JobIDs.
accounting_all_jobID_set = set(accounting_jobIDs)
# Aggregate the BillingDetails JobIDs and unique them.
details_all_jobID_set = set(reduce(lambda a,b: a+b,[details_billable_jobIDs,details_nonbillable_jobIDs,details_failed_jobIDs]))
# Unique the BillingDetails Billable JobIDs.
details_billable_jobID_set = set(details_billable_jobIDs)
# Aggregate the BillingNotifs JobIDs and unique them.
notifs_all_jobID_set = set(reduce(lambda a,b: a+b, list(pi_tag_jobIDs_dict.values())))
# Compare:
# The accounting JobIDs
# The BillingDetails JobID aggregate
# They should be the same.
# This operation gets set of elements in either accounting or details, but not both.
accounting_symdiff_details = accounting_all_jobID_set ^ details_all_jobID_set
print("NOT IN BOTH ACCOUNTING AND DETAILS: %d" % (len(accounting_symdiff_details)))
print()
if len(accounting_symdiff_details) > 0:
print("In accounting only:")
print_set(accounting_symdiff_details & accounting_all_jobID_set, 100)
print()
print("In details only:")
print_set(accounting_symdiff_details & details_all_jobID_set, 100)
print()
# Compare:
# The BillingDetails Billable JobIDs
# The BillingNotifs JobID aggregate
# They should be the same.
# This operation gets set of elements in either billable details or notifs, but not both.
details_billable_symdiff_notifs = details_billable_jobID_set ^ notifs_all_jobID_set
print("NOT IN BILLABLE DETAILS AND NOTIFS: %d" % (len(details_billable_symdiff_notifs)))
print()
if len(details_billable_symdiff_notifs) > 0:
print("In billable details only:")
print_set(details_billable_symdiff_notifs & details_billable_jobID_set)
print()
print("In notifs only:")
print_set(details_billable_symdiff_notifs & notifs_all_jobID_set)
print()
if len(accounting_symdiff_details) > 0 or len(details_billable_symdiff_notifs) > 0:
print()
print("JOBS INCONSISTENT AMONG FILES")
sys.exit(-1)
else:
print()
print("ALL JOBS ARE IN ALL FILES")
sys.exit(0)