-
Notifications
You must be signed in to change notification settings - Fork 0
/
db_creator.py
260 lines (245 loc) · 12.5 KB
/
db_creator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#pylint: disable=C0321
"""
create sqlite3 database from csv file
"""
# Standard Library
import os
import csv
import pprint
import sqlite3
# Dependent Module
import settings
__version__ = "0.1"
IMPORTED_FOLDERS = "IMPORTED_FOLDERS"
FLOOR_CHT = ["", "一", "二", "三", "四", "五", "六", "七", "八", "九"]
def floor(txt: str) -> int:
""" convert Chinese level into Arabic number """
underground = True if txt[:2] == "地下" else False
target = txt[2:-1] if underground else txt [:-1]
units = target.split(sep="十")
total = 0; exp = 1 # init local variable
for unit in units[::-1]:
if not unit and exp != 1:
digit = 1
else:
try:
digit = FLOOR_CHT.index(unit)
except ValueError:
return None
total += (digit * exp)
exp *= 10
total = total if not underground else -total
return total if total else None
def floor_all(lst: list) -> list:
""" shortcut for converting a list of Chinese levels """
numerals = [floor(txt) for txt in lst if floor(txt)]
return sorted(numerals)
def init_db(cur: sqlite3.Cursor) -> str:
""" initialize database with essential data tables """
cur.execute('''CREATE TABLE IF NOT EXISTS {0}(
quarter TEXT PRIMARY KEY,
createdAt TEXT NOT NULL,
geocode_log INTEGER NOT NULL DEFAULT 0 CHECK(
geocode_log >= 0 AND geocode_log <= 67108863
)
);'''.format(IMPORTED_FOLDERS))
cur.execute('''CREATE TABLE IF NOT EXISTS 建物型態(
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL UNIQUE,
count INTEGER DEFAULT 0
);''')
cur.execute('''CREATE TABLE IF NOT EXISTS 主要用途(
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL UNIQUE,
count INTEGER DEFAULT 0
);''')
cur.execute('''CREATE TABLE IF NOT EXISTS 主要建材(
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL UNIQUE,
count INTEGER DEFAULT 0
);''')
cur.execute('''CREATE TABLE IF NOT EXISTS 都市土地使用分區(
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL UNIQUE,
count INTEGER DEFAULT 0
);''')
cur.execute('''CREATE TABLE IF NOT EXISTS 非都市土地使用分區(
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL UNIQUE,
count INTEGER DEFAULT 0
);''')
cur.execute('''CREATE TABLE IF NOT EXISTS 非都市土地使用編定(
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL UNIQUE,
count INTEGER DEFAULT 0
);''')
cur.execute('''CREATE TABLE IF NOT EXISTS 車位類別(
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL UNIQUE,
count INTEGER DEFAULT 0
);''')
cur.execute("SELECT quarter FROM {0}".format(IMPORTED_FOLDERS))
return [t[0] for t in cur.fetchall()]
def create_table(cur: sqlite3.Cursor, prefix: str):
""" create data tables with the same prefix """
cur.execute('''CREATE TABLE IF NOT EXISTS "{0}/TRX"(
編號 TEXT PRIMARY KEY,
縣市 TEXT NOT NULL CHECK(length(縣市) == 3),
鄉鎮市區 TEXT NOT NULL CHECK(length(鄉鎮市區) <= 4),
土地區段位置或建物區門牌 TEXT NOT NULL,
交易年月日 TEXT NOT NULL,
總價元 INTEGER NOT NULL,
單價每平方公尺 INTEGER,
親友間交易 INTEGER NOT NULL,
含增建 INTEGER NOT NULL
);'''.format(prefix))
cur.execute('''CREATE TABLE IF NOT EXISTS "{0}/GEO"(
編號 TEXT PRIMARY KEY,
LAT_1 REAL, LON_1 REAL,
LAT_2 REAL, LON_2 REAL,
LAT_3 REAL, LON_3 REAL,
LAT_4 REAL, LON_4 REAL,
LAT_5 REAL, LON_5 REAL,
LAT_Avg REAL, LON_Avg REAL,
FOREIGN KEY(編號) REFERENCES "{0}/TRX"(編號)
);'''.format(prefix))
cur.execute('''CREATE TABLE IF NOT EXISTS "{0}/BUILD"(
編號 TEXT PRIMARY KEY,
總樓層數 INTEGER,
移轉層次 TEXT,
建物型態 INTEGER NOT NULL,
主要用途 INTEGER,
主要建材 INTEGER,
建築完成年月 TEXT,
建物移轉總面積平方公尺 INTEGER NOT NULL,
'建物現況格局-隔間' INTEGER NOT NULL,
'建物現況格局-房' INTEGER NOT NULL,
'建物現況格局-廳' INTEGER NOT NULL,
'建物現況格局-衛' INTEGER NOT NULL,
有無管理組織 INTEGER NOT NULL,
FOREIGN KEY(編號) REFERENCES "{0}/TRX"(編號),
FOREIGN KEY(建物型態) REFERENCES 建物型態(id),
FOREIGN KEY(主要用途) REFERENCES 主要用途(id),
FOREIGN KEY(主要建材) REFERENCES 主要建材(id)
);'''.format(prefix))
cur.execute('''CREATE TABLE IF NOT EXISTS "{0}/LAND"(
編號 TEXT PRIMARY KEY,
土地移轉總面積平方公尺 REAL NOT NULL,
都市土地使用分區 INTEGER,
非都市土地使用分區 INTEGER,
非都市土地使用編定 INTEGER,
FOREIGN KEY(編號) REFERENCES "{0}/TRX"(編號),
FOREIGN KEY(都市土地使用分區) REFERENCES 都市土地使用分區(id),
FOREIGN KEY(非都市土地使用分區) REFERENCES 非都市土地使用分區(id),
FOREIGN KEY(非都市土地使用編定) REFERENCES 非都市土地使用編定(id)
);'''.format(prefix))
cur.execute('''CREATE TABLE IF NOT EXISTS "{0}/PARK"(
編號 TEXT PRIMARY KEY,
車位類別 INTEGER,
車位移轉總面積平方公尺 REAL NOT NULL,
車位總價元 INTEGER NOT NULL,
FOREIGN KEY(編號) REFERENCES "{0}/TRX"(編號),
FOREIGN KEY(車位類別) REFERENCES 車位類別(id)
);'''.format(prefix))
def parse_csv(rdr: csv.DictReader, cur: sqlite3.Cursor, prefix: str, county: str):
""" insert csv into specified data table """
def exist_row(picked_row: dict, fieldname: str):
""" insert new type into specified table or increase count when already exists """
if (not picked_row[fieldname]) or ("見" in picked_row[fieldname]):
return
cur.execute("SELECT EXISTS(SELECT * FROM {0} WHERE type == ?);".format(fieldname),
(picked_row[fieldname],))
if not cur.fetchall()[0][0]:
cur.execute("INSERT INTO {0}(type) VALUES(?);".format(fieldname),
(picked_row[fieldname],))
cur.execute("UPDATE {0} SET count = count + 1 WHERE type = ?;".format(fieldname),
(picked_row[fieldname],))
for row in rdr:
if "建物" not in row["交易標的"]:
continue
if row["鄉鎮市區"] == "fa72埔鄉":
row["鄉鎮市區"] = "鹽埔鄉"
elif row["鄉鎮市區"] == "金fa4b鄉":
row["鄉鎮市區"] = "金峰鄉"
try:
cur.execute('''INSERT INTO "{0}/TRX" VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?
);'''.format(prefix),
(row["編號"], county, row["鄉鎮市區"], row["土地區段位置或建物區門牌"],
row["交易年月日"], row["總價元"], row["單價每平方公尺"],
True if ("親" in row["備註"]) or ("友" in row["備註"]) else False,
True if "增建" in row["備註"] else False))
except sqlite3.IntegrityError:
pprint.pprint(row)
raise
numbers = floor_all([f for f in row["移轉層次"].split(sep=",") if f and f[-1] == "層"])
num_str = ", ".join(str(number) for number in numbers)
cur.execute('''INSERT INTO "{0}/GEO"(編號) VALUES (?);'''.format(prefix), (row["編號"],))
exist_row(row, "建物型態")
exist_row(row, "主要用途")
exist_row(row, "主要建材")
cur.execute('''INSERT INTO "{0}/BUILD" VALUES (
?, ?, ?, (
SELECT id FROM 建物型態 WHERE type == ?
), (
SELECT id FROM 主要用途 WHERE type == ?
), (
SELECT id FROM 主要建材 WHERE type == ?
), ?, ?, ?, ?, ?, ?, ?
);'''.format(prefix),
(row["編號"], floor(row["總樓層數"]), num_str if num_str else None, row["建物型態"],
row["主要用途"], row["主要建材"], row["建築完成年月"], row["建物移轉總面積平方公尺"],
True if row["建物現況格局-隔間"] == "有" else False, row["建物現況格局-房"], row["建物現況格局-廳"],
row["建物現況格局-衛"], True if row["有無管理組織"] == "有" else False))
exist_row(row, "都市土地使用分區")
exist_row(row, "非都市土地使用分區")
exist_row(row, "非都市土地使用編定")
cur.execute('''INSERT INTO "{0}/LAND" VALUES (
?, ?, (
SELECT id FROM 都市土地使用分區 WHERE type == ?
), (
SELECT id FROM 非都市土地使用分區 WHERE type == ?
), (
SELECT id FROM 非都市土地使用編定 WHERE type == ?
)
);'''.format(prefix),
(row["編號"], row["土地移轉總面積平方公尺"], row["都市土地使用分區"], row["非都市土地使用分區"],
row["非都市土地使用編定"]))
exist_row(row, "車位類別")
cur.execute('''INSERT INTO "{0}/PARK" VALUES (
?, (
SELECT id FROM 車位類別 WHERE type == ?
), ?, ?
);'''.format(prefix),
(row["編號"], row["車位類別"], row["車位移轉總面積平方公尺"], row["車位總價元"]))
def main():
""" Main Process """
connection = sqlite3.connect(settings.__main_db__)
cursor = connection.cursor()
table_names = init_db(cursor)
folder_names = next(os.walk(settings.__resources__))[1]
for folder_name in folder_names:
print(folder_name)
folder_path = os.path.join(settings.__resources__, folder_name)
if folder_name in table_names:
continue
cursor.execute("BEGIN;") # Disable auto-commit
create_table(cursor, folder_name)
file_names = os.listdir(folder_path)
for file_name in file_names:
(root, ext) = os.path.splitext(file_name)
if (ext != ".CSV") or (not root.endswith("A")):
continue
file_path = os.path.join(folder_path, file_name)
with open(file_path, 'r', encoding='big5', errors='ignore') as fstream:
reader = csv.DictReader(fstream)
cnty_cht = settings.alpha2cht(root[0])
parse_csv(reader, cursor, folder_name, cnty_cht)
cursor.execute('''INSERT INTO {0}(quarter, createdAt) VALUES (
?, CURRENT_TIMESTAMP
);'''.format(IMPORTED_FOLDERS),
(folder_name,))
connection.commit()
connection.close()
if __name__ == "__main__":
main()