forked from wikimedia/labs-tools-lists
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
113 lines (90 loc) · 3.55 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import logging
import subprocess
import configparser
from datetime import datetime, timedelta, timezone
querydir = '/data/project/lists/query'
outputdir = '/data/project/lists/output'
logging.basicConfig(filename='scheduler.log', level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s')
deltas = {'default': timedelta(days=1),
'daily': timedelta(days=1),
'weekly': timedelta(weeks=1),
'monthly': timedelta(weeks=4),
'twiceweekly': timedelta(days=3, hours=12),
'twicemonthly': timedelta(weeks=2)}
def process_list(cnf_path):
# Create the paths
sql_path = cnf_path[:-3] + 'sql'
run_path = cnf_path.replace(querydir, outputdir, 1)[:-3] + 'run'
tmp_path = run_path[:-3] + 'tmp'
out_path = run_path[:-3] + 'out'
list_path = cnf_path.replace(querydir, '')[1:-4]
# Create the output directory
if not os.path.exists(os.path.dirname(run_path)):
os.makedirs(os.path.dirname(run_path))
# Read the configuration file
cnf_file = configparser.ConfigParser()
cnf_file.read(cnf_path)
try:
project = cnf_file['query']['project']
frequency = cnf_file['query']['frequency']
except KeyError:
logging.exception('Invalid configuration in %s', list_path)
return
if frequency == 'none':
logging.info('Skipping %s', list_path)
return
try:
delta = deltas[frequency]
except KeyError:
logging.exception('Invalid frequency in %s', list_path)
return
# Read the run file
run_file = configparser.ConfigParser()
run_file.read(run_path)
try:
last_run = datetime.strptime(
run_file['output']['last_run'], '%Y-%m-%d %H:%M:%S.%f')
run_counter = int(run_file['output']['run_counter'])
total_runtime = float(run_file['output']['total_runtime'])
except (KeyError, ValueError):
run_file['output'] = {}
last_run = datetime.strptime(
'1970-01-01 00:00:00.000000', '%Y-%m-%d %H:%M:%S.%f')
run_counter = 0
total_runtime = 0.0
# Check if we need to run the query
if datetime.utcnow() < last_run + delta:
logging.info('No need to run %s', list_path)
return
# Execute the query
logging.info('Executing %s', list_path)
start_datetime = datetime.utcnow()
try:
subprocess.run('mysql --defaults-file=~/replica.my.cnf -h ' + project +
'.analytics.db.svc.eqiad.wmflabs -BN < ' + sql_path + ' > ' + tmp_path, shell=True, check=True)
os.replace(tmp_path, out_path)
except subprocess.CalledProcessError:
if os.path.exists(tmp_path):
os.remove(tmp_path)
logging.exception('Subprocess error for %s', list_path)
return
end_datetime = datetime.utcnow()
runtime = (end_datetime - start_datetime).total_seconds()
# Write run file
run_file['output']['last_run'] = str(end_datetime)
run_file['output']['last_runtime'] = str(runtime)
run_file['output']['run_counter'] = str(run_counter + 1)
run_file['output']['total_runtime'] = str(
round(total_runtime + runtime, 6))
with open(run_path, 'w', encoding='utf-8') as fp:
run_file.write(fp)
if __name__ == '__main__':
logging.info('Scheduler task has started')
# For all the configuration files
for root, subdirs, files in os.walk(querydir):
for file_name in files:
if file_name[-3:] == 'cnf':
process_list(os.path.join(root, file_name))
logging.info('Scheduler task has ended')