Skip to content

Commit

Permalink
geography -> geometry
Browse files Browse the repository at this point in the history
  • Loading branch information
olsen232 committed Mar 3, 2021
1 parent 73e6cdc commit 00c1429
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 30 deletions.
31 changes: 22 additions & 9 deletions sno/working_copy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,7 @@ def write_full(self, commit, *datasets, **kwargs):
self._write_meta(sess, dataset)

if dataset.has_geometry:
# This should be called while the table is still empty.
self._create_spatial_index(sess, dataset)
self._create_spatial_index_pre(sess, dataset)

L.info("Creating features...")
sql = self._insert_into_dataset(dataset)
Expand Down Expand Up @@ -759,6 +758,9 @@ def write_full(self, commit, *datasets, **kwargs):
"Overall rate: %d features/s", (feat_progress / (t1 - t0 or 0.001))
)

if dataset.has_geometry:
self._create_spatial_index_post(sess, dataset)

self._create_triggers(sess, dataset)
self._update_last_write_time(sess, dataset, commit)

Expand All @@ -774,18 +776,29 @@ def _write_meta(self, sess, dataset):
"""Write any non-feature data relating to dataset - title, description, CRS, etc."""
raise NotImplementedError()

def _create_spatial_index(self, sess, dataset):
def _create_spatial_index_pre(self, sess, dataset):
"""
Creates a spatial index for the table for the given dataset.
The spatial index is configured so that it is automatically updated when the table is modified.
It is not guaranteed that the spatial index will take into account features that are already present
in the table when this function is called - therefore, this should be called while the table is still empty.
This function comes in a pair - _pre is called before features are written, and _post is called afterwards.
Once both are called, the index must contain all the features currently in the table, and, be
configured such that any further writes cause the index to be updated automatically.
"""
raise NotImplementedError()

# Note that the simplest implementation is to add a trigger here so that any further writes update
# the index. Then _create_spatial_index_post needn't be implemented.
pass

def _create_spatial_index_post(self, sess, dataset):
"""Like _create_spatial_index_pre, but runs AFTER the bulk of features have been written."""

# Being able to create the index after the bulk of features have been written could be useful for two reasons:
# 1. It might be more efficient to write the features first, then index afterwards.
# 2. Certain working copies are not able to create an index without first knowing a rough bounding box.
pass

def _drop_spatial_index(self, sess, dataset):
"""Inverse of _create_spatial_index - deletes the spatial index."""
raise NotImplementedError()
"""Inverse of _create_spatial_index_* - deletes the spatial index."""
pass

def _update_last_write_time(self, sess, dataset, commit=None):
"""Hook for updating the last-modified timestamp stored for a particular dataset, if there is one."""
Expand Down
2 changes: 1 addition & 1 deletion sno/working_copy/db_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def check_valid_db_uri(cls, db_uri, workdir_path=None):

suggestion_message = ""
if len(path_parts) == 1 and workdir_path is not None:
suggested_path = f"/{path_parts[0]}/{cls.default_schema(workdir_path)}"
suggested_path = f"/{path_parts[0]}/{cls.default_db_schema(workdir_path)}"
suggested_uri = urlunsplit(
[url.scheme, url.netloc, suggested_path, url.query, ""]
)
Expand Down
30 changes: 20 additions & 10 deletions sno/working_copy/gpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,12 @@ def _delete_meta_metadata(self, sess, table_name):
)
sess.execute(stmt, {"ids": ids})

def _create_spatial_index(self, sess, dataset):
def _create_spatial_index_pre(self, sess, dataset):
# Implementing only _create_spatial_index_pre:
# gpkgAddSpatialIndex has to be called before writing any features,
# since it only adds on-write triggers to update the index - it doesn't
# add any pre-existing features to the index.

L = logging.getLogger(f"{self.__class__.__qualname__}._create_spatial_index")
geom_col = dataset.geom_column_name

Expand Down Expand Up @@ -631,6 +636,19 @@ def _apply_meta_metadata_dataset_json(self, sess, dataset, src_value, dest_value
def _update_last_write_time(self, sess, dataset, commit=None):
self._update_gpkg_contents(sess, dataset, commit)

def _get_geom_extent(self, sess, dataset):
"""Returns the envelope around the entire dataset as (min_x, min_y, max_x, max_y)."""
# FIXME: Why doesn't Extent(geom) work here as an aggregate?
geom_col = dataset.geom_column_name
r = sess.execute(
f"""
WITH _E AS
(SELECT extent({self.quote(geom_col)}) AS extent FROM {self.table_identifer(dataset)})
SELECT ST_MinX(extent), ST_MinY(extent), ST_MaxX(extent), ST_MaxY(extent) FROM _E
"""
)
return r.fetchone()

def _update_gpkg_contents(self, sess, dataset, commit=None):
"""
Update the metadata for the given table in gpkg_contents to have the new bounding-box / last-updated timestamp.
Expand All @@ -642,17 +660,9 @@ def _update_gpkg_contents(self, sess, dataset, commit=None):
# GPKG Spec Req. 15:
gpkg_change_time = change_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

table_identifer = self.table_identifier(dataset)
geom_col = dataset.geom_column_name
if geom_col is not None:
# FIXME: Why doesn't Extent(geom) work here as an aggregate?
r = sess.execute(
f"""
WITH _E AS (SELECT extent({self.quote(geom_col)}) AS extent FROM {table_identifer})
SELECT ST_MinX(extent), ST_MinY(extent), ST_MaxX(extent), ST_MaxY(extent) FROM _E
"""
)
min_x, min_y, max_x, max_y = r.fetchone()
min_x, min_y, max_x, max_y = self._get_geom_extent(sess, dataset)
rc = sess.execute(
"""
UPDATE gpkg_contents
Expand Down
5 changes: 4 additions & 1 deletion sno/working_copy/postgis.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ def delete_meta(self, dataset):
"""Delete any metadata that is only needed by this dataset."""
pass # There is no metadata except for the spatial_ref_sys table.

def _create_spatial_index(self, sess, dataset):
def _create_spatial_index_post(self, sess, dataset):
# Only implemented as _create_spatial_index_post:
# Its more efficient to write the features first, then index them all in bulk.
L = logging.getLogger(f"{self.__class__.__qualname__}._create_spatial_index")

geom_col = dataset.geom_column_name
Expand All @@ -250,6 +252,7 @@ def _create_spatial_index(self, sess, dataset):
ON {self.table_identifier(dataset)} USING GIST ({self.quote(geom_col)});
"""
)
sess.execute(f"""ANALYSE {self.table_identifier(dataset)};""")
L.info("Created spatial index in %ss", time.monotonic() - t0)

def _drop_spatial_index(self, sess, dataset):
Expand Down
59 changes: 51 additions & 8 deletions sno/working_copy/sqlserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def _create_table_for_dataset(self, sess, dataset):

def _table_def_for_column_schema(self, col, dataset):
if col.data_type == "geometry":
# This user-defined GeographyType adapts Sno's GPKG geometry to SQL Server's native geography type.
crs_name = col.extra_type_info.get("geometryCRS", None)
crs_id = crs_util.get_identifier_int_from_dataset(dataset, crs_name) or 0
return sqlalchemy.column(col.name, GeographyType(crs_id))
# This user-defined GeometryType adapts Sno's GPKG geometry to SQL Server's native geometry type.
return sqlalchemy.column(col.name, GeometryType(crs_id))
elif col.data_type in ("date", "time", "timestamp"):
return sqlalchemy.column(col.name, BaseDateOrTimeType)
else:
Expand Down Expand Up @@ -198,20 +198,63 @@ def delete_meta(self, dataset):
# There is no metadata stored anywhere except the table itself.
pass

def _create_spatial_index(self, sess, dataset):
def _get_geom_extent(self, sess, dataset, default=None):
"""Returns the envelope around the entire dataset as (min_x, min_y, max_x, max_y)."""
geom_col = dataset.geom_column_name
r = sess.execute(
f"""
WITH _E AS (
SELECT geometry::EnvelopeAggregate({self.quote(geom_col)}) AS envelope
FROM {self.table_identifier(dataset)}
)
SELECT
envelope.STPointN(1).STX AS min_x,
envelope.STPointN(1).STY AS min_y,
envelope.STPointN(3).STX AS max_x,
envelope.STPointN(3).STY AS max_y
FROM _E;
"""
)
result = r.fetchone()
if result == (None, None, None, None) and default is not None:
return default
return result

def _grow_rectangle(self, rectangle, scale_factor):
# scale_factor = 1 -> no change, >1 -> grow, <1 -> shrink.
min_x, min_y, max_x, max_y = rectangle
centre_x, centre_y = (min_x + max_x) / 2, (min_y + max_y) / 2
min_x = (min_x - centre_x) * scale_factor + centre_x
min_y = (min_y - centre_y) * scale_factor + centre_y
max_x = (max_x - centre_x) * scale_factor + centre_x
max_y = (max_y - centre_y) * scale_factor + centre_y
return min_x, min_y, max_x, max_y

def _create_spatial_index_post(self, sess, dataset):
# Only implementing _create_spatial_index_post:
# We need to know the rough extent of the data to create an index in that area.

L = logging.getLogger(f"{self.__class__.__qualname__}._create_spatial_index")

geom_col = dataset.geom_column_name
index_name = f"{dataset.table_name}_idx_{geom_col}"

# Create the SQL Server Spatial Index
L.debug("Creating spatial index for %s.%s", dataset.table_name, geom_col)
t0 = time.monotonic()
index_name = f"{dataset.table_name}_idx_{geom_col}"
# Create an index over the extent of existing geometries + 20% room to grow.
GROW_FACTOR = 1.2
min_x, min_y, max_x, max_y = self._grow_rectangle(
self._get_geom_extent(sess, dataset, default=(-180, -90, +180, +90)),
GROW_FACTOR,
)
# Placeholders not allowed in CREATE SPATIAL INDEX - have to f-string them in.
sess.execute(
f"""
CREATE SPATIAL INDEX {self.quote(index_name)}
ON {self.table_identifier(dataset)} ({self.quote(geom_col)});
"""
ON {self.table_identifier(dataset)} ({self.quote(geom_col)})
WITH (BOUNDING_BOX = ({min_x}, {min_y}, {max_x}, {max_y}))
""",
)
L.info("Created spatial index in %ss", time.monotonic() - t0)

Expand Down Expand Up @@ -337,7 +380,7 @@ def compile_instance_function(element, compiler, **kw):
return "(%s).%s()" % (element.clauses, element.name)


class GeographyType(UserDefinedType):
class GeometryType(UserDefinedType):
"""UserDefinedType so that V2 geometry is adapted to MS binary format."""

def __init__(self, crs_id):
Expand All @@ -350,7 +393,7 @@ def bind_processor(self, dialect):
def bind_expression(self, bindvalue):
# 2. Writing - SQL layer - wrap in call to STGeomFromWKB to convert WKB to MS binary.
return Function(
quoted_name("geography::STGeomFromWKB", False),
quoted_name("geometry::STGeomFromWKB", False),
bindvalue,
self.crs_id,
type_=self,
Expand Down
2 changes: 1 addition & 1 deletion sno/working_copy/sqlserver_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def quote(ident):
"blob": "varbinary",
"date": "date",
"float": {0: "real", 32: "real", 64: "float"},
"geometry": "geography",
"geometry": "geometry",
"integer": {
0: "int",
8: "tinyint",
Expand Down

0 comments on commit 00c1429

Please sign in to comment.