-
-
Notifications
You must be signed in to change notification settings - Fork 340
/
Copy pathsql.py
99 lines (85 loc) · 3.2 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
import traceback
from django.core.exceptions import EmptyResultSet
from django.utils import timezone
from silk.collector import DataCollector
from silk.config import SilkyConfig
Logger = logging.getLogger('silk.sql')
def _should_wrap(sql_query):
if not DataCollector().request:
return False
for ignore_str in SilkyConfig().SILKY_IGNORE_QUERIES:
if ignore_str in sql_query:
return False
return True
def _unpack_explanation(result):
for row in result:
if not isinstance(row, str):
yield ' '.join(str(c) for c in row)
else:
yield row
def _explain_query(connection, q, params):
if connection.features.supports_explaining_query_execution:
if SilkyConfig().SILKY_ANALYZE_QUERIES:
# Work around some DB engines not supporting analyze option
try:
prefix = connection.ops.explain_query_prefix(
analyze=True, **(SilkyConfig().SILKY_EXPLAIN_FLAGS or {})
)
except ValueError as error:
error_str = str(error)
if error_str.startswith("Unknown options:"):
Logger.warning(
"Database does not support analyzing queries with provided params. %s."
"SILKY_ANALYZE_QUERIES option will be ignored",
error_str
)
prefix = connection.ops.explain_query_prefix()
else:
raise error
else:
prefix = connection.ops.explain_query_prefix()
# currently we cannot use explain() method
# for queries other than `select`
prefixed_query = f"{prefix} {q}"
with connection.cursor() as cur:
cur.execute(prefixed_query, params)
result = _unpack_explanation(cur.fetchall())
return '\n'.join(result)
return None
def execute_sql(self, *args, **kwargs):
"""wrapper around real execute_sql in order to extract information"""
try:
q, params = self.as_sql()
if not q:
raise EmptyResultSet
except EmptyResultSet:
try:
result_type = args[0]
except IndexError:
result_type = kwargs.get('result_type', 'multi')
if result_type == 'multi':
return iter([])
else:
return
tb = ''.join(reversed(traceback.format_stack()))
sql_query = q % params
if _should_wrap(sql_query):
query_dict = {
'query': sql_query,
'start_time': timezone.now(),
'traceback': tb
}
try:
return self._execute_sql(*args, **kwargs)
finally:
query_dict['end_time'] = timezone.now()
request = DataCollector().request
if request:
query_dict['request'] = request
if self.query.model.__module__ != 'silk.models':
query_dict['analysis'] = _explain_query(self.connection, q, params)
DataCollector().register_query(query_dict)
else:
DataCollector().register_silk_query(query_dict)
return self._execute_sql(*args, **kwargs)