-
Notifications
You must be signed in to change notification settings - Fork 1
/
database.py
100 lines (82 loc) · 3.43 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from models import Model
from config import Config
import pymysql
from dbutils.pooled_db import PooledDB
class MySQLOperation:
_POOL: PooledDB = None
@staticmethod
def init_pool():
if not MySQLOperation._POOL:
MySQLOperation._POOL = PooledDB(creator=pymysql, **Config.database)
@staticmethod
def close_pool():
if MySQLOperation._POOL:
MySQLOperation._POOL.close()
@staticmethod
def insert(entity:Model) -> bool:
table = MySQLOperation.table_name(entity)
fields = MySQLOperation.fields_substament(entity)
values = MySQLOperation.__values_substament(entity)
row_num = MySQLOperation.execute(f'insert into {table}({fields}) \nvalues ({values});')
return row_num > 0
@staticmethod
def batch_insert(entity_list:list) -> int:
if not entity_list:
return
table = MySQLOperation.table_name(entity_list[0])
fields = MySQLOperation.fields_substament(entity_list[0])
values_list = [MySQLOperation.__values_substament(entity) for entity in entity_list]
mult_values = '\n, '.join(map(lambda values: '({})'.format(values), values_list))
return MySQLOperation.execute(f'insert into {table}({fields}) \nvalues {mult_values};')
@staticmethod
def select_all(_type:type) -> list:
if not isinstance(_type, type) or not issubclass(_type, Model):
raise TypeError('Parameter "_type" must be a Model subtype')
entity = _type()
table = MySQLOperation.table_name(entity)
fields = MySQLOperation.fields_substament(entity)
return MySQLOperation.query(f'select {fields} from {table};', _type)
@staticmethod
def execute(sql:str) -> int:
row_num = 0
connect = MySQLOperation._POOL.connection()
with connect.cursor() as cursor:
row_num = cursor.execute(sql)
connect.commit()
connect.close()
return row_num
@staticmethod
def query(sql:str, _type:type) -> list:
if not isinstance(_type, type) or not issubclass(_type, Model):
raise TypeError('Parameter "_type" must be a Model subtype')
result = []
fields = _type()._metadata.keys()
connect = MySQLOperation._POOL.connection()
with connect.cursor() as cursor:
cursor.execute(sql)
data = cursor.fetchall()
cols = {d[0]: i for i, d in enumerate(cursor.description) if d[0] in fields}
if not data: data = tuple()
for row in data:
entity = _type()
for col, i in cols.items():
setattr(entity, col, row[i])
result.append(entity)
connect.close()
return result
@staticmethod
def table_name(entity:Model) -> str:
ls = list(entity.__class__.__name__)
ls[0] = ls[0].lower()
iter = map(lambda letter: letter if letter.islower() else f'_{letter.lower()}', ls)
return ''.join(iter)
@staticmethod
def fields_substament(entity:Model) -> str:
return ', '.join(entity._metadata.keys())
@staticmethod
def __values_substament(entity:Model, field_names:list=None) -> str:
metadata = entity._metadata
if field_names is None:
field_names = metadata.keys()
field_values = [metadata[fn].to_sql(getattr(entity, fn)) for fn in field_names]
return ', '.join(field_values)