-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpoly.py
175 lines (153 loc) · 7.64 KB
/
poly.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import matplotlib.pyplot as plt
import random
from typing import List
from shapely import Point, Polygon, MultiPoint
from redis import Connection, from_url
from enum import Enum
REDIS_URL = 'redis://localhost:6379'
MIN_X = 0
MAX_X = 10
MIN_Y = 0
MAX_Y = 10
class QUERY(Enum):
WITHIN = 'within'
CONTAINS = 'contains'
class COLOR(Enum):
RED = 'red'
GREEN = 'green'
BLUE = 'blue'
CYAN = 'cyan'
PURPLE = 'purple'
BROWN = 'brown'
ORANGE = 'orange'
OLIVE = 'olive'
class SHAPE(Enum):
POLYGON = 'Polygon'
POINT = 'Point'
class PolygonDemo(object):
def __init__(self):
self.client: Connection = from_url(REDIS_URL)
self.client.flushdb()
def demo(self) -> None:
""" Public function that creates 4 random polygons and points. The shapes are aligned in layers such that the 4th is contained by the 3rd,
the 3rd by the 2nd, etc. Each shape is stored as a JSON object in Redis with its name and WKT parameters. Finally, a series
of searches exercising the Redis WITHIN and CONTAINS queries are executed. Note at the time of the writing the GEOMETRY and POLYGON
keywords are not supported in the redis-py lib. This function is sending raw CLI commands to Redis.
Returns
-------
None
"""
poly_red: Polygon = self._get_polygon()
poly_green: Polygon = self._get_polygon(poly_red)
poly_blue: Polygon = self._get_polygon(poly_green)
poly_cyan: Polygon = self._get_polygon(poly_blue)
point_purple: Point = self._get_point(poly_cyan)
point_brown: Point = self._get_point(poly_blue)
point_orange: Point = self._get_point(poly_green)
point_olive: Point = self._get_point(poly_red)
self.client.execute_command('FT.CREATE', 'idx', 'ON', 'JSON', 'PREFIX', '1', 'key:',
'SCHEMA', '$.name', 'AS', 'name', 'TEXT', '$.geom', 'AS', 'geom', 'GEOSHAPE', 'FLAT')
self.client.json().set('key:1', '$', { "name": "Red Polygon", "geom": poly_red.wkt })
self.client.json().set('key:2', '$', { "name": "Green Polygon", "geom": poly_green.wkt })
self.client.json().set('key:3', '$', { "name": "Blue Polygon", "geom": poly_blue.wkt })
self.client.json().set('key:4', '$', { "name": "Cyan Polygon", "geom": poly_cyan.wkt })
self.client.json().set('key:5', '$', { "name": "Purple Point", "geom": point_purple.wkt })
self.client.json().set('key:6', '$', { "name": "Brown Point", "geom": point_brown.wkt })
self.client.json().set('key:7', '$', { "name": "Orange Point", "geom": point_orange.wkt })
self.client.json().set('key:8', '$', { "name": "Olive Point", "geom": point_olive.wkt })
print('\n*** Polygons within the Red Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.RED, poly_red, SHAPE.POLYGON)
print('\n*** Polygons within the Green Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.GREEN, poly_green, SHAPE.POLYGON)
print('\n*** Polygons within the Blue Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.BLUE, poly_blue, SHAPE.POLYGON)
print('\n*** Polygons within the Cyan Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.CYAN, poly_cyan, SHAPE.POLYGON)
print('\n*** Points within the Red Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.RED, poly_red, SHAPE.POINT)
print('\n*** Points within the Green Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.GREEN, poly_green, SHAPE.POINT)
print('\n*** Points within the Blue Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.BLUE, poly_blue, SHAPE.POINT)
print('\n*** Points within the Cyan Polygon ***')
self._poly_search(QUERY.WITHIN, COLOR.CYAN, poly_cyan, SHAPE.POINT)
print('\n*** Polygons containing the Red Polygon ***')
self._poly_search(QUERY.CONTAINS, COLOR.RED, poly_red, SHAPE.POLYGON)
print('\n*** Polygons containing the Green Polygon ***')
self._poly_search(QUERY.CONTAINS, COLOR.GREEN, poly_green, SHAPE.POLYGON)
print('\n*** Polygons containing the Blue Polygon ***')
self._poly_search(QUERY.CONTAINS, COLOR.BLUE, poly_blue, SHAPE.POLYGON)
print('\n*** Polygons containing the Cyan Polygon ***')
self._poly_search(QUERY.CONTAINS, COLOR.CYAN, poly_cyan, SHAPE.POLYGON)
print('\n*** Polygons containing the Purple Point ***')
self._poly_search(QUERY.CONTAINS, COLOR.PURPLE, point_purple, SHAPE.POLYGON)
print('\n*** Polygons containing the Brown Point ***')
self._poly_search(QUERY.CONTAINS, COLOR.BROWN, point_brown, SHAPE.POLYGON)
print('\n*** Polygons containing the Orange Point ***')
self._poly_search(QUERY.CONTAINS, COLOR.ORANGE, point_orange, SHAPE.POLYGON)
print('\n*** Polygons containing the Olive Point ***')
self._poly_search(QUERY.CONTAINS, COLOR.OLIVE, point_olive, SHAPE.POLYGON)
plt.plot(*poly_red.exterior.xy, c=COLOR.RED.value)
plt.plot(*poly_green.exterior.xy, c=COLOR.GREEN.value)
plt.plot(*poly_blue.exterior.xy, c=COLOR.BLUE.value)
plt.plot(*poly_cyan.exterior.xy, c=COLOR.CYAN.value)
plt.scatter(point_purple.x, point_purple.y, c=COLOR.PURPLE.value)
plt.scatter(point_brown.x, point_brown.y, c=COLOR.BROWN.value)
plt.scatter(point_orange.x, point_orange.y, c=COLOR.ORANGE.value)
plt.scatter(point_olive.x, point_olive.y, c=COLOR.OLIVE.value)
plt.show()
def _poly_search(self, qt: QUERY, color: COLOR, shape: Polygon, filter: SHAPE) -> None:
""" Private function for POLYGON search in Redis.
Parameters
----------
qt - Redis Geometry search type (contains or within)
color - color attribute of polygon
shape - Shapely point or polygon object
filter - query filter on shape types (polygon or point) to be returned
Returns
-------
None
"""
results: list = self.client.execute_command('FT.SEARCH', 'idx', f'(-@name:{color.value} @name:{filter.value} @geom:[{qt.value} $qshape])', 'PARAMS', '2', 'qshape', shape.wkt, 'RETURN', '1', 'name', 'DIALECT', '3')
if (results[0] > 0):
for res in results:
if isinstance(res, list):
print(res[1].decode('utf-8').strip('[]"'))
else:
print('None')
def _get_point(self, box: Polygon = None) -> Point:
""" Private function to generate a random point, potentially within a bounding box
Parameters
----------
box - Optional bounding box
Returns
-------
Shapely Point object
"""
point: Point
if box:
minx, miny, maxx, maxy = box.bounds
while True:
point = Point(random.uniform(minx, maxx), random.uniform(miny, maxy))
if box.contains(point):
break
else:
point = Point(random.uniform(MIN_X, MAX_X), random.uniform(MIN_Y, MAX_Y))
return point
def _get_polygon(self, box: Polygon = None) -> Polygon:
""" Private function to generate a random polygon, potentially within a bounding box
Parameters
----------
box - Optional bounding box
Returns
-------
Shapely Polygon object
"""
points: List[Point] = []
for _ in range(random.randint(3,10)):
points.append(self._get_point(box))
ob: MultiPoint = MultiPoint(points)
return Polygon(ob.convex_hull)
if __name__ == '__main__':
pd: PolygonDemo = PolygonDemo()
pd.demo()