This repository has been archived by the owner on Jul 7, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
run.py
117 lines (87 loc) · 4.18 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import argparse
import os
import sys
import json
from tqdm import tqdm
from bulletin_download import main as bulletin_downloader
from db.main import DBMain
from local_extractor import main as extractor_main
from local_extractor.utils import custom_exceptions
STATES = [
'DL', 'GA', 'HR',
'KA', 'KL', 'MH',
'PB', 'TG', 'TN',
'UK', 'WB'
]
DOWNLOADED_BULLETINS_STR = 'downloaded-bulletins'
BULLETIN_PATH_STR = 'bulletin-paths'
PROCESSED_DATES_STR = 'processed-dates'
def get_parser():
parser = argparse.ArgumentParser()
parser.add_argument('--datadir', type=str, required=True, help='Data directory path to store bulletins and database')
parser.add_argument('--run_only', type=str, required=False, default=None, help='Comma-separated values of states to run data extraction for')
parser.add_argument('--force_run_states', type=str, required=False, default=None, help='Comma-separated values of states to force re-run data extraction procedure for')
parser.add_argument('--skip_bulletin_downloader', default=False, action='store_true', help='Should the procedure execute bulletin downloader or not?')
parser.add_argument('--skip_db_setup', default=False, action='store_true', help='Should the procedure execute DB setup or not?')
parser.add_argument('--skip_bulletin_parser', default=False, action='store_true', help='Should the procedure execute bulletin parser or not?')
return parser
def run(args):
# Get list of states to execute extraction procedure for
states_to_execute = None
if args.run_only is not None:
states_to_execute = [x.strip() for x in args.run_only.split(',')]
# Get list of states to force a re-run
force_rerun_states = []
if args.force_run_states is not None:
force_rerun_states = [x.strip() for x in args.force_run_states.split(',')]
bulletin_links = None
# Download bulletins
if args.skip_bulletin_downloader:
print('Skipping bulletin downloader section')
else:
bulletin_links = bulletin_downloader.run(args.datadir, state_to_execute=states_to_execute)
# Setup tables
if args.skip_db_setup:
print('Skipping DB setup')
else:
db_obj = DBMain(args.datadir)
db_obj.record_db_metadata()
if bulletin_links is not None:
db_obj.record_bulletin_links(bulletin_links)
# Start extraction
if args.skip_bulletin_parser:
print('Skipping bulletin parser routine')
else:
state_pbar = tqdm(STATES, desc="States")
for state in state_pbar:
if states_to_execute is not None and state not in states_to_execute:
continue
state_pbar.set_description(f'State: {state}')
metadata_path = os.path.join(args.datadir, 'metadata', 'bulletins', f'{state}.json')
with open(metadata_path, 'r') as f:
data = json.load(f)
if PROCESSED_DATES_STR not in data:
data[PROCESSED_DATES_STR] = []
date_pbar = tqdm(data[BULLETIN_PATH_STR].items(), desc="Dates", leave=False)
for date, fpath in date_pbar:
date_pbar.set_description(f'Date: {date}')
if state not in force_rerun_states and date in data[PROCESSED_DATES_STR]:
continue
try:
stateinfo = extractor_main.extract_info(state, date, fpath)
db_obj.insert_for_state(state, stateinfo)
except custom_exceptions.UnprocessedBulletinException as err:
# Bulletin failed validation checks. Remove from metadata
print(f'{state} Bulletin for date {date} failed validation checks')
if date in data[DOWNLOADED_BULLETINS_STR]:
data[DOWNLOADED_BULLETINS_STR].remove(date)
except Exception as err:
print(f'Error in parsing date: {date}. Error: {err}')
else:
data[PROCESSED_DATES_STR].append(date)
with open(metadata_path, 'w') as f:
json.dump(data, f)
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
run(args)