-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatabase_flow.py
89 lines (66 loc) · 2.42 KB
/
database_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from utils import get_kibana, generalize_sql
from data_flow_graph import format_tsv_lines, format_graphviz_lines, logs_map_and_reduce
from sql_metadata import get_query_tables
import logging
import re
def get_flow(period, limit):
logger = logging.getLogger('get_flow')
kibana = get_kibana(period)
# fetch DB queries
def _map_query(row):
query = generalize_sql(re.sub(r'^SQL ', '', row['@message']))
database = row['@fields']['database']['name']
if database in ['uportal.mysql', 'default']:
database = 'mysql'
# print(query, kind, tables)
return (
database,
query,
'php:{}'.format(row['@context']['method']),
)
logs = map(
_map_query,
kibana.query_by_string('@context.rows: *',
fields=['@message', '@fields.database.name', '@context.method'],
limit=limit)
)
logs = [log for log in logs if log is not None]
# print(list(logs))
# group logs using source name and URL, ignore user agent
def _map(entry):
return '{}-{}'.format(entry[0], entry[1])
# this will be called for each group of logs
def _reduce(items):
first = items[0]
logger.info(first)
sql = str(first[1])
tables = get_query_tables(sql) or ['unknown']
kind = sql.split(' ')[0]
table = '{}:{}'.format(first[0], tables[0])
method = first[2]
ret = {
'source': table,
'edge': 'SQL {}'.format(kind),
'target': method,
'metadata': '{:.3f} QPS'.format(1. * len(items) / period)
}
# reverse the direction of the graph
# from method (code) to table (database)
if kind not in ['SELECT']:
ret['target'] = table
ret['source'] = method
return ret
logger.info('Mapping %d log entries...' % len(logs))
return logs_map_and_reduce(logs, _map, _reduce)
def main():
logger = logging.getLogger(__name__)
graph = get_flow(period=7200, limit=10000) # last two hours
logger.info('Saving to TSV...')
with open('output/database.tsv', 'wt') as fp:
fp.writelines(format_tsv_lines(graph))
logger.info('Saving to GV...')
with open('output/database.gv', 'wt') as fp:
fp.writelines(format_graphviz_lines(graph))
logger.info('Done')
if __name__ == "__main__":
main()