forked from Kapernikov/luigi_demo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathle_fetch_data.py
executable file
·34 lines (26 loc) · 1.04 KB
/
le_fetch_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import requests
import csv
import pandas as pd
import json
import luigi
from luigi.contrib import sqla
import datetime
from le_utils import *
import sqlalchemy
from le_targets import *
from le_create_db import CreateDB
class GetEQData(luigi.Task):
'''Task to get the earthquakes data from USGS website and send them to
the specified database'''
# Defaulting to the eq engine, linked to the eq_db sqlite database
engine_name = luigi.Parameter(default='eq')
def requires(self):
return CreateDB(db_file_name=MAIN_DB_PATH)
def output(self):
return SQLiteTableTarget(table='earthquakes', eng=DB_ENGINES[self.engine_name])
def run(self):
data = pd.read_csv(os.path.join('data', 'eq_data.csv'))
# Because dates are parsed as text by pandas
data.time = pd.to_datetime(data.time, format='%Y-%m-%dT%H:%M:%S.%f')
# The engine_name is mapped to an actual engine object via dict lookup
data.to_sql('earthquakes', con=DB_ENGINES[self.engine_name], if_exists='replace', index=False)