-
Notifications
You must be signed in to change notification settings - Fork 69
/
query_builder.py
183 lines (142 loc) · 7.64 KB
/
query_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from __future__ import absolute_import
from builtins import object
#!/usr/bin/python
# -*- coding: UTF-8 -*-
__author__ = 'carolinux'
STRINGCAST_FORMAT = 'cast("{}" as character) {} \'{}\' AND cast("{}" as character) >= \'{}\' '
INT_FORMAT = "{} {} {} AND {} >= {} "
STRING_FORMAT = "\"{}\" {} '{}' AND \"{}\" >= '{}' "
from timemanager.utils import time_util
class QueryIdioms(object):
OGR = "OGR"
SQL = "SQL"
class QueryBuildingException(Exception):
pass
def can_compare_lexicographically(date_format):
"""Can only compare lexicographically when the order of appearance in the string
is year, month, date"""
# fortunately, valid date formats cannot have the same %x twice
ioy = date_format.find("%Y")
iom = date_format.find("%m")
iod = date_format.find("%d")
ioh = date_format.find("%H")
iomin = date_format.find("%M")
ios = date_format.find("%S")
return ioy <= iom and iom <= iod and (iod <= ioh or ioh == -1) and\
(ioh <= iomin or iomin == -1) and (iomin <= ios or ios == -1)
def create_ymd_substring(ioy, iom, iod, ioh, col, quote_type):
q = quote_type
ystr = "SUBSTR({}{}{},{},{})".format(q, col, q, ioy + 1,
ioy + 5) if ioy >= 0 else None # adding 1 because SQL indexing is 1-based
mstr = "SUBSTR({}{}{},{},{})".format(q, col, q, iom + 1, iom + 3) if iom >= 0 else None
dstr = "SUBSTR({}{}{},{},{})".format(q, col, q, iod + 1, iod + 3) if iod >= 0 else None
max_index = max(ioy, iom, iod)
ior = max_index + (2 if max_index != ioy else 4) # find where the rest of the string is
reststr = "SUBSTR({}{}{},{},{})".format(q, col, q, ior + 1,
ior + 1 + 8 + 1 + 6) if ioh >= 0 else None
string_components = [x for x in [ystr, mstr, dstr, reststr] if x is not None]
return ",".join(string_components)
def likeBC(attr, cast=False):
if not cast:
return '"{}" LIKE \'%BC\''.format(attr)
else:
return ' cast("{}" as character) LIKE \'%BC\''.format(attr)
def likeAD(attr, cast=False):
if not cast:
return '"{}" LIKE \'%AD\''.format(attr)
else:
return ' cast("{}" as character) LIKE \'%AD\''.format(attr)
AND = " AND "
OR = " OR "
def NOT(q):
return "NOT ({})".format(q)
def lessThan(val, col, equals=False, cast=False):
comparison = '<' if not equals else '<='
if not cast:
return " '{}' {} \"{}\" ".format(val, comparison, col)
else:
return " '{}' {} cast(\"{}\" as character) ".format(val, comparison, col)
def greaterThan(val, col, equals=False, cast=False):
comparison = '>' if not equals else '>='
if not cast:
return " '{}' {} \"{}\" ".format(val, comparison, col)
else:
return " '{}' {} cast(\"{}\" as character) ".format(val, comparison, col)
def isAfter(col, val, equals=False, bc=False, cast=False):
if not bc:
return lessThan(val, col, equals, cast=False)
else:
return greaterThan(val, col, equals, cast)
def isBefore(col, val, equals=False, bc=False, cast=False):
return isAfter(col, val, equals, not bc, cast)
def paren(q):
return "( " + q + " )"
# start_attr <- from_attr, end_attr <- to_attr
def build_query_archaelogical(start_str, end_str, start_attr, end_attr, comparison, query_idiom):
"""Build subset query for archaeology mode taking in account BC and AC markers"""
cast = query_idiom == QueryIdioms.OGR # if it's OGR need to cast as string
if "BC" in start_str and "BC" in end_str:
# for BC need to invert the order of comparisons
return paren(paren(
likeBC(end_attr, cast=cast) + AND + isAfter(col=end_attr, val=start_str, equals=True,
bc=True, cast=cast)) + OR + likeAD(
end_attr, cast=cast)) \
+ AND \
+ paren(likeBC(start_attr, cast=cast) + AND + isBefore(col=start_attr, val=end_str,
equals=('=' in comparison),
bc=True, cast=cast))
if "AD" in start_str and "AD" in end_str:
return paren(
likeAD(end_attr, cast=cast) + AND + isAfter(col=end_attr, val=start_str, equals=True,
bc=False, cast=cast)) \
+ AND \
+ paren(likeBC(start_attr, cast=cast) + OR + paren(
isBefore(col=start_attr, val=end_str, equals=('=' in comparison), bc=False, cast=cast)
+ AND + likeAD(start_attr, cast=cast)))
# can only be start_attr = BC and end_attr = AD
return paren(
NOT(likeAD(start_attr, cast=cast)) + OR + paren(likeAD(start_attr, cast=cast) + AND
+ greaterThan(val=end_str, col=start_attr,
equals=('=' in comparison),
cast=cast))) \
+ AND \
+ paren(NOT(likeBC(end_attr, cast=cast)) + OR + paren(likeBC(end_attr, cast=cast) + AND
+ greaterThan(val=start_str,
col=end_attr,
equals=True,
cast=cast)))
def build_query(start_dt, end_dt, from_attr, to_attr, date_type, date_format, query_idiom, acc):
"""Build subset query"""
if acc: # features never die
start_dt = time_util.get_min_dt()
comparison = "<" # simplified because of: https://github.com/anitagraser/TimeManager/issues/235
# (original: # comparison = "<" if to_attr == from_attr else "<=")
if date_type == time_util.DateTypes.IntegerTimestamps:
start_epoch = time_util.datetime_to_epoch(start_dt)
end_epoch = time_util.datetime_to_epoch(end_dt)
return INT_FORMAT.format(from_attr, comparison, end_epoch, to_attr, start_epoch)
start_str = time_util.datetime_to_str(start_dt, date_format)
end_str = time_util.datetime_to_str(end_dt, date_format)
if date_type == time_util.DateTypes.DatesAsStringsArchaelogical:
# kept <= option here since I'm not sure about implications in archaelogical mode
comparison = "<" if to_attr == from_attr else "<="
return build_query_archaelogical(start_str, end_str, from_attr, to_attr, comparison, query_idiom)
if can_compare_lexicographically(date_format):
if query_idiom == QueryIdioms.OGR:
return STRINGCAST_FORMAT.format(from_attr, comparison, end_str, to_attr, start_str)
else:
return STRING_FORMAT.format(from_attr, comparison, end_str, to_attr, start_str)
else:
# thankfully, SQL & OGR syntax agree on substr and concat
if date_type != time_util.DateTypes.DatesAsStrings:
raise QueryBuildingException()
ioy = date_format.find("%Y")
iom = date_format.find("%m")
iod = date_format.find("%d")
ioh = date_format.find("%H")
sub1 = create_ymd_substring(ioy, iom, iod, ioh, from_attr, quote_type='"') # quote type for column names
sub2 = create_ymd_substring(ioy, iom, iod, ioh, end_str, quote_type='\'') # quote type for values
sub3 = create_ymd_substring(ioy, iom, iod, ioh, to_attr, quote_type='"')
sub4 = create_ymd_substring(ioy, iom, iod, ioh, start_str, quote_type='\'')
query = "CONCAT({}) {} CONCAT({}) AND CONCAT({})>=CONCAT({})".format(sub1, comparison, sub2, sub3, sub4)
return query