-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhourly_processing.py
66 lines (56 loc) · 1.76 KB
/
hourly_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from datetime import datetime, timedelta
from airflow import DAG
from constants import (
LA_METRO_DATABASE_URL,
LA_METRO_SEARCH_URL,
START_DATE,
LA_METRO_CONFIGS,
ENVIRONMENT,
LA_METRO_IMAGE_URL
)
from operators.blackbox_docker_operator import TaggedDockerOperator
default_args = {
"start_date": START_DATE,
"execution_timeout": timedelta(minutes=20),
"image": LA_METRO_IMAGE_URL,
"environment": {
"DATABASE_URL": LA_METRO_DATABASE_URL,
"SEARCH_URL": LA_METRO_SEARCH_URL,
"SENTRY_ENVIRONMENT": ENVIRONMENT,
**LA_METRO_CONFIGS,
},
}
with DAG(
"hourly_processing",
default_args=default_args,
schedule_interval="10,25,40,55 * * * *",
description=(
"Refresh the document cache, compile bill and event packets, extract "
"attachment text, update the search index, and confirm the search "
"index and database contain the same number of bills at 10, 25, 40, "
"and 55 minutes past the hour."
),
) as dag:
t1 = TaggedDockerOperator(
task_id="refresh_pic",
command="python manage.py refresh_pic",
)
t2 = TaggedDockerOperator(
task_id="compile_pdfs",
command="python manage.py compile_pdfs",
)
t3 = TaggedDockerOperator(
task_id="convert_attachment_text",
command="python manage.py convert_attachment_text",
)
if datetime.now().minute >= 55:
update_index_command = "python manage.py update_index --batch-size=100 --remove"
else:
update_index_command = (
"python manage.py update_index --batch-size=100 --age=1 --remove"
)
t4 = TaggedDockerOperator(
task_id="update_index",
command=update_index_command,
)
t1 >> t2 >> t3 >> t4