-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathScheduled.py
113 lines (107 loc) · 4.02 KB
/
Scheduled.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import datetime
def handler(system, this):
inputs = this.get('input_value') or {}
message_id = inputs.get('message_id')
if message_id is None:
defaults = {
'scheduled_at': '08:30',
}
try:
defaults['flow_name'] = inputs['flow_name']
except Exception:
pass
message = system.message(
subject='Scheduled execution',
body={
'type': 'object',
'properties': {
'flow_name': {
'label': 'Name of the flow which should be scheduled',
'element': 'string',
'type': 'string',
'example': defaults.get('flow_name'),
'default': defaults.get('flow_name'),
'order': 1,
},
'scheduled_at': {
'label': 'Time when the child execution should be started',
'element': 'time',
'type': 'string',
'format': 'time',
'default': defaults['scheduled_at'],
'order': 2,
},
'max_iterations': {
'label': 'Maximum number of iterations (unlimited if omitted)',
'element': 'number',
'type': 'number',
'order': 3,
},
'start': {
'label': 'Start schedule',
'element': 'submit',
'order': 4,
},
},
'required': [
'flow_name',
'scheduled_at',
],
},
)
message_id = message.get('id')
this.save(output_value={
'message_id': message_id,
})
this.flow(
'Scheduled',
name='Scheduled execution',
message_id=message_id,
wait=False,
)
return this.success('requested details')
message = system.message(message_id)
response = message.wait().get('response')
this.log(response=response)
flow_name = response['flow_name']
scheduled_at = response['scheduled_at']
max_iterations = response.get('max_iterations')
this.save(name=f'Scheduled {flow_name}')
scheduled_at_t = datetime.datetime.strptime(scheduled_at, '%H:%M:%S%z').timetz()
this.log(scheduled_at_t=scheduled_at_t)
iterations = 0
start = datetime.datetime.now(datetime.timezone.utc).timestamp()
while max_iterations is None or iterations < max_iterations:
now = datetime.datetime.now(datetime.timezone.utc)
this.log(now=now)
today = now.date()
this.log(today=today)
scheduled = datetime.datetime.combine(today, scheduled_at_t)
this.log(scheduled=scheduled)
if scheduled < now: # next iteration tomorrow
tomorrow = today + datetime.timedelta(days=1)
scheduled = datetime.datetime.combine(tomorrow, scheduled_at_t)
this.log(scheduled=scheduled)
scheduled_ts = scheduled.isoformat(sep=' ', timespec='minutes')
this.log(scheduled_ts=scheduled_ts)
delta_sec = (scheduled - now).total_seconds()
this.log(delta_sec=delta_sec)
this.save(message=scheduled_ts)
this.sleep(delta_sec)
iterations += 1
this.save(message=f'iteration {iterations}/{max_iterations}')
# Start child execution
inputs = {
'start': start,
'iterations': iterations,
'max_iterations': max_iterations,
}
this.flow(
flow_name,
inputs=inputs,
name=f'{flow_name} iteration #{iterations}',
wait=False,
)
if max_iterations is not None and iterations >= max_iterations:
break
return this.success(f'started {iterations} iterations')