forked from ericpruitt/cronex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cronex.py
executable file
·355 lines (303 loc) · 13.4 KB
/
cronex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
#!/usr/bin/env python
"""
This module provides a class for cron-like scheduling systems, and
exposes the function used to convert static cron expressions to Python
sets.
CronExpression objects are instantiated with a cron formatted string
that represents the times when the trigger is active. When using
expressions that contain periodic terms, an extension of cron created
for this module, a starting epoch should be explicitly defined. When the
epoch is not explicitly defined, it defaults to the Unix epoch. Periodic
terms provide a method of recurring triggers based on arbitrary time
periods.
Standard Cron Triggers:
>>> job = CronExpression("0 0 * * 1-5/2 find /var/log -delete")
>>> job.check_trigger((2010, 11, 17, 0, 0))
True
>>> job.check_trigger((2012, 12, 21, 0 , 0))
False
Periodic Trigger:
>>> job = CronExpression("0 %9 * * * Feed 'it'", (2010, 5, 1, 7, 0, -6))
>>> job.comment
"Feed 'it'"
>>> job.check_trigger((2010, 5, 1, 7, 0), utc_offset=-6)
True
>>> job.check_trigger((2010, 5, 1, 16, 0), utc_offset=-6)
True
>>> job.check_trigger((2010, 5, 2, 1, 0), utc_offset=-6)
True
"""
import datetime
import calendar
import re
__all__ = ["CronExpression", "parse_atom", "DEFAULT_EPOCH", "SUBSTITUTIONS",
"is_special_atom"]
__license__ = "Public Domain, MIT, or FreeBSD"
# Python 3 compatibility
if isinstance(map, type):
def map(*args):
return list(__builtins__['map'](*args))
def zip(*args):
return list(__builtins__['zip'](*args))
xrange = range
DAY_NAMES = zip(('sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'), xrange(7))
SECONDS = (0, 59)
MINUTES = (0, 59)
HOURS = (0, 23)
DAYS_OF_MONTH = (1, 31)
MONTHS = (1, 12)
DAYS_OF_WEEK = (0, 6)
L_FIELDS = (DAYS_OF_WEEK, DAYS_OF_MONTH)
FIELD_RANGES = (SECONDS, MINUTES, HOURS, DAYS_OF_MONTH, MONTHS, DAYS_OF_WEEK)
MONTH_NAMES = zip(('jan', 'feb', 'mar', 'apr', 'may', 'jun',
'jul', 'aug', 'sep', 'oct', 'nov', 'dec'), xrange(1, 13))
DEFAULT_EPOCH = (1970, 1, 1, 0, 0, 0)
SUBSTITUTIONS = {
"@yearly": "0 0 0 1 1 *",
"@annually": "0 0 0 1 1 *",
"@monthly": "0 0 0 1 * *",
"@weekly": "0 0 0 * * 0",
"@daily": "0 0 0 * * *",
"@midnight": "0 0 0 * * *",
"@hourly": "0 0 * * * *"
}
VALIDATE_POUND = re.compile("^[0-6]#[1-5]")
VALIDATE_L_IN_DOW = re.compile("^[0-6]L$")
VALIDATE_W = re.compile("^[0-3]?[0-9]W$")
class CronExpression(object):
def __init__(self, line, epoch=DEFAULT_EPOCH, epoch_utc_offset=0):
"""
Instantiates a CronExpression object with an optionally defined epoch.
If the epoch is defined, the UTC offset can be specified one of two
ways: as the sixth element in 'epoch' or supplied in epoch_utc_offset.
The epoch should be defined down to the minute sorted by
descending significance.
"""
for key, value in SUBSTITUTIONS.items():
if line.startswith(key):
line = line.replace(key, value)
break
fields = line.split(None, 6)
if len(fields) == 6:
fields.append('')
seconds, minutes, hours, dom, months, dow, self.comment = fields
dow = dow.replace('7', '0').replace('?', '*')
dom = dom.replace('?', '*')
for monthstr, monthnum in MONTH_NAMES:
months = months.lower().replace(monthstr, str(monthnum))
for dowstr, downum in DAY_NAMES:
dow = dow.lower().replace(dowstr, str(downum))
self.string_tab = map(str.upper, [seconds, minutes, hours, dom, months, dow])
self.compute_numtab()
if len(epoch) == 6:
y, mo, d, h, m, s = epoch
self.epoch = (y, mo, d, h, m, s, epoch_utc_offset)
else:
self.epoch = epoch
def __str__(self):
base = self.__class__.__name__ + "(%s)"
cron_line = self.string_tab + [str(self.comment)]
if not self.comment:
cron_line.pop()
arguments = '"' + ' '.join(cron_line) + '"'
if self.epoch != DEFAULT_EPOCH:
return base % (arguments + ", epoch=" + repr(self.epoch))
else:
return base % arguments
def __repr__(self):
return str(self)
def compute_numtab(self):
"""
Recomputes the sets for the static ranges of the trigger time.
This method should only be called by the user if the string_tab
member is modified.
"""
self.numerical_tab = []
for field_str, span in zip(self.string_tab, FIELD_RANGES):
split_field_str = field_str.split(',')
if len(split_field_str) > 1 and "*" in split_field_str:
raise ValueError("\"*\" must be alone in a field.")
unified = set()
for cron_atom in split_field_str:
# parse_atom only handles static cases
if not(is_special_atom(cron_atom, span)):
unified.update(parse_atom(cron_atom, span))
self.numerical_tab.append(unified)
if self.string_tab[3] == "*" and self.string_tab[5] != "*":
self.numerical_tab[3] = set()
elif self.string_tab[5] == "*" and self.string_tab[3] != "*":
self.numerical_tab[5] = set()
def check_trigger(self, date_tuple, utc_offset=0):
"""
Returns boolean indicating if the trigger is active at the given time.
The date tuple should be in the local time. Unless periodicities are
used, utc_offset does not need to be specified. If periodicities are
used, specifically in the hour and minutes fields, it is crucial that
the utc_offset is specified.
"""
year, month, day, hour, mins, sec = date_tuple
given_date = datetime.date(year, month, day)
zeroday = datetime.date(*self.epoch[:3])
last_dom = calendar.monthrange(year, month)[-1]
dom_matched = True
# In calendar and datetime.date.weekday, Monday = 0
given_dow = (datetime.date.weekday(given_date) + 1) % 7
first_dow = (given_dow + 1 - day) % 7
# Figure out how much time has passed from the epoch to the given date
utc_diff = utc_offset - self.epoch[6]
mod_delta_yrs = year - self.epoch[0]
mod_delta_mon = month - self.epoch[1] + mod_delta_yrs * 12
mod_delta_day = (given_date - zeroday).days
mod_delta_hrs = hour - self.epoch[3] + mod_delta_day * 24 + utc_diff
mod_delta_min = mins - self.epoch[4] + mod_delta_hrs * 60
mod_delta_sec = sec - self.epoch[5] + mod_delta_min * 60
# Makes iterating through like components easier.
quintuple = zip(
(sec, mins, hour, day, month, given_dow),
self.numerical_tab,
self.string_tab,
(mod_delta_sec, mod_delta_min, mod_delta_hrs, mod_delta_day, mod_delta_mon,
mod_delta_day),
FIELD_RANGES)
for value, valid_values, field_str, delta_t, field_type in quintuple:
# All valid, static values for the fields are stored in sets
if value in valid_values:
continue
# The following for loop implements the logic for context
# sensitive and epoch sensitive constraints. break statements,
# which are executed when a match is found, lead to a continue
# in the outer loop. If there are no matches found, the given date
# does not match expression constraints, so the function returns
# False as seen at the end of this for...else... construct.
for cron_atom in field_str.split(','):
if cron_atom[0] == '%':
if not(delta_t % int(cron_atom[1:])):
break
elif '#' in cron_atom:
D, N = int(cron_atom[0]), int(cron_atom[2])
# Computes Nth occurence of D day of the week
if (((D - first_dow) % 7) + 1 + 7 * (N - 1)) == day:
break
elif cron_atom[-1] == 'W':
target = min(int(cron_atom[:-1]), last_dom)
lands_on = (first_dow + target - 1) % 7
if lands_on == 0:
# Shift from Sun. to Mon. unless Mon. is next month
target += 1 if target < last_dom else -2
elif lands_on == 6:
# Shift from Sat. to Fri. unless Fri. in prior month
target += -1 if target > 1 else 2
# Break if the day is correct, and target is a weekday
if target == day and (first_dow + target) % 7 > 1:
break
elif cron_atom[-1] == 'L':
# In dom field, L means the last day of the month
target = last_dom
if field_type == DAYS_OF_WEEK:
# Calculates the last occurence of given day of week
desired_dow = int(cron_atom[:-1])
target = (((desired_dow - first_dow) % 7) + 29)
target -= 7 if target > last_dom else 0
if target == day:
break
else:
# See 2010.11.15 of CHANGELOG
if field_type == DAYS_OF_MONTH and self.string_tab[5] != '*':
dom_matched = False
continue
elif field_type == DAYS_OF_WEEK and self.string_tab[3] != '*':
# If we got here, then days of months validated so it does
# not matter that days of the week failed.
return dom_matched
# None of the expressions matched which means this field fails
return False
# Arriving at this point means the date landed within the constraints
# of all fields; the associated trigger should be fired.
return True
def is_special_atom(cron_atom, span):
"""
Returns a boolean indicating whether or not the string can be parsed by
parse_atom to produce a static set. In the process of examining the
string, the syntax of any special character uses is also checked.
"""
for special_char in ('%', '#', 'L', 'W'):
if special_char not in cron_atom:
continue
if special_char == '#':
if span != DAYS_OF_WEEK:
raise ValueError("\"#\" invalid where used.")
elif not VALIDATE_POUND.match(cron_atom):
raise ValueError("\"#\" syntax incorrect.")
elif special_char == "W":
if span != DAYS_OF_MONTH:
raise ValueError("\"W\" syntax incorrect.")
elif not(VALIDATE_W.match(cron_atom) and int(cron_atom[:-1]) > 0):
raise ValueError("Invalid use of \"W\".")
elif special_char == "L":
if span not in L_FIELDS:
raise ValueError("\"L\" invalid where used.")
elif span == DAYS_OF_MONTH:
if cron_atom != "L":
raise ValueError("\"L\" must be alone in days of month.")
elif span == DAYS_OF_WEEK:
if not VALIDATE_L_IN_DOW.match(cron_atom):
raise ValueError("\"L\" syntax incorrect.")
elif special_char == "%":
if not(cron_atom[1:].isdigit() and int(cron_atom[1:]) > 1):
raise ValueError("\"%\" syntax incorrect.")
break
else:
return False
return True
def parse_atom(parse, minmax):
"""
Returns a set containing valid values for a given cron-style range of
numbers. The 'minmax' arguments is a two element iterable containing the
inclusive upper and lower limits of the expression.
Examples:
>>> parse_atom("1-5",(0,6))
set([1, 2, 3, 4, 5])
>>> parse_atom("*/6",(0,23))
set([0, 6, 12, 18])
>>> parse_atom("18-6/4",(0,23))
set([18, 22, 0, 4])
>>> parse_atom("*/9",(0,23))
set([0, 9, 18])
"""
parse = parse.strip()
increment = 1
if parse == '*':
return set(xrange(minmax[0], minmax[1] + 1))
elif parse.isdigit():
# A single number still needs to be returned as a set
value = int(parse)
if value >= minmax[0] and value <= minmax[1]:
return set((value,))
else:
raise ValueError("\"%s\" is not within valid range." % parse)
elif '-' in parse or '/' in parse:
divide = parse.split('/')
subrange = divide[0]
if len(divide) == 2:
# Example: 1-3/5 or */7 increment should be 5 and 7 respectively
increment = int(divide[1])
if '-' in subrange:
# Example: a-b
prefix, suffix = [int(n) for n in subrange.split('-')]
if prefix < minmax[0] or suffix > minmax[1]:
raise ValueError("\"%s\" is not within valid range." % parse)
elif subrange == '*':
# Include all values with the given range
prefix, suffix = minmax
else:
raise ValueError("Unrecognized symbol \"%s\"" % subrange)
if prefix < suffix:
# Example: 7-10
return set(xrange(prefix, suffix + 1, increment))
else:
# Example: 12-4/2; (12, 12 + n, ..., 12 + m*n) U (n_0, ..., 4)
noskips = list(xrange(prefix, minmax[1] + 1))
noskips+= list(xrange(minmax[0], suffix + 1))
return set(noskips[::increment])
else:
raise ValueError("Atom \"%s\" not in a recognized format." % parse )