diff --git a/sno/base_dataset.py b/sno/base_dataset.py index 56325600f..0a25f0592 100644 --- a/sno/base_dataset.py +++ b/sno/base_dataset.py @@ -9,6 +9,10 @@ class BaseDataset(ImportSource): """ Common interface for all datasets - mainly Dataset2, but there is also Dataset0 and Dataset1 used by `sno upgrade`. + + A Dataset instance is immutable since it is a view of a particular git tree. + To get a new version of a dataset, commit the desired changes, + then instantiate a new Dataset instance that references the new git tree. """ # Constants that subclasses should generally define. diff --git a/sno/checkout.py b/sno/checkout.py index cf3f80ace..587a49011 100644 --- a/sno/checkout.py +++ b/sno/checkout.py @@ -378,7 +378,7 @@ def create_workingcopy(ctx, discard_changes, wc_path): wc_path = WorkingCopy.default_path(repo.workdir_path) if wc_path != old_wc_path: - WorkingCopy.check_valid_creation_path(repo.workdir_path, wc_path) + WorkingCopy.check_valid_creation_path(wc_path, repo.workdir_path) # Finished sanity checks - start work: if old_wc and wc_path != old_wc_path: diff --git a/sno/clone.py b/sno/clone.py index c598746ad..63574566a 100644 --- a/sno/clone.py +++ b/sno/clone.py @@ -106,7 +106,7 @@ def clone( if repo_path.exists() and any(repo_path.iterdir()): raise InvalidOperation(f'"{repo_path}" isn\'t empty', param_hint="directory") - WorkingCopy.check_valid_creation_path(repo_path, wc_path) + WorkingCopy.check_valid_creation_path(wc_path, repo_path) if not repo_path.exists(): repo_path.mkdir(parents=True) diff --git a/sno/geometry.py b/sno/geometry.py index ca42f0d75..8873c8ddd 100644 --- a/sno/geometry.py +++ b/sno/geometry.py @@ -59,6 +59,11 @@ def with_crs_id(self, crs_id): @property def crs_id(self): + """ + Returns the CRS ID as it is embedded in the GPKG header - before the WKB. + Note that datasets V2 zeroes this field before committing, + so will return zero when called on Geometry where it has been zeroed. + """ wkb_offset, is_le, crs_id = parse_gpkg_geom(self) return crs_id @@ -296,6 +301,7 @@ def gpkg_geom_to_ogr(gpkg_geom, parse_crs=False): def wkt_to_gpkg_geom(wkt, **kwargs): + """Given a well-known-text string, returns a GPKG Geometry object.""" if wkt is None: return None @@ -304,6 +310,7 @@ def wkt_to_gpkg_geom(wkt, **kwargs): def wkb_to_gpkg_geom(wkb, **kwargs): + """Given a well-known-binary bytestring, returns a GPKG Geometry object.""" if wkb is None: return None @@ -312,6 +319,7 @@ def wkb_to_gpkg_geom(wkb, **kwargs): def hex_wkb_to_gpkg_geom(hex_wkb, **kwargs): + """Given a hex-encoded well-known-binary bytestring, returns a GPKG Geometry object.""" if hex_wkb is None: return None diff --git a/sno/init.py b/sno/init.py index 4f7dcc5de..a9d88b4bc 100644 --- a/sno/init.py +++ b/sno/init.py @@ -369,7 +369,7 @@ def init( if repo_path.exists() and any(repo_path.iterdir()): raise InvalidOperation(f'"{repo_path}" isn\'t empty', param_hint="directory") - WorkingCopy.check_valid_creation_path(repo_path, wc_path) + WorkingCopy.check_valid_creation_path(wc_path, repo_path) if not repo_path.exists(): repo_path.mkdir(parents=True) diff --git a/sno/repo.py b/sno/repo.py index 7fb303fa6..1e5420d1a 100644 --- a/sno/repo.py +++ b/sno/repo.py @@ -177,7 +177,7 @@ def init_repository( repo_root_path = repo_root_path.resolve() cls._ensure_exists_and_empty(repo_root_path) if not bare: - WorkingCopy.check_valid_creation_path(repo_root_path, wc_path) + WorkingCopy.check_valid_creation_path(wc_path, repo_root_path) extra_args = [] if initial_branch is not None: @@ -224,7 +224,7 @@ def clone_repository( repo_root_path = repo_root_path.resolve() cls._ensure_exists_and_empty(repo_root_path) if not bare: - WorkingCopy.check_valid_creation_path(repo_root_path, wc_path) + WorkingCopy.check_valid_creation_path(wc_path, repo_root_path) if bare: sno_repo = cls._create_with_git_command( diff --git a/sno/sqlalchemy.py b/sno/sqlalchemy.py index c3208f462..50d9cf91e 100644 --- a/sno/sqlalchemy.py +++ b/sno/sqlalchemy.py @@ -1,6 +1,6 @@ import re import socket -from urllib.parse import urlsplit, urlunsplit +from urllib.parse import urlsplit, urlunsplit, urlencode, parse_qs import sqlalchemy @@ -110,7 +110,7 @@ def _on_connect(psycopg2_conn, connection_record): CANONICAL_SQL_SERVER_SCHEME = "mssql" INTERNAL_SQL_SERVER_SCHEME = "mssql+pyodbc" -SQL_SERVER_DRIVER_LIB = "ODBC+Driver+17+for+SQL+Server" +SQL_SERVER_DRIVER_LIB = "ODBC Driver 17 for SQL Server" def sqlserver_engine(msurl): @@ -122,7 +122,7 @@ def sqlserver_engine(msurl): url_netloc = re.sub(r"\blocalhost\b", _replace_with_localhost, url.netloc) url_query = _append_to_query( - url.query, {"driver": SQL_SERVER_DRIVER_LIB, "Application+Name": "sno"} + url.query, {"driver": SQL_SERVER_DRIVER_LIB, "Application Name": "sno"} ) msurl = urlunsplit( @@ -137,14 +137,15 @@ def _replace_with_localhost(*args, **kwargs): return socket.gethostbyname("localhost") -def _append_query_to_url(uri, query_dict): +def _append_query_to_url(uri, new_query_dict): url = urlsplit(uri) - url_query = _append_to_query(url.query, query_dict) + url_query = _append_to_query(url.query, new_query_dict) return urlunsplit([url.scheme, url.netloc, url.path, url_query, ""]) -def _append_to_query(url_query, query_dict): - for key, value in query_dict.items(): - if key not in url_query: - url_query = "&".join(filter(None, [url_query, f"{key}={value}"])) - return url_query +def _append_to_query(existing_query, new_query_dict): + query_dict = parse_qs(existing_query) + for key, value in new_query_dict.items(): + if key not in query_dict: + query_dict[key] = value + return urlencode(query_dict) diff --git a/sno/working_copy/base.py b/sno/working_copy/base.py index 4dffdcac2..2b7acc449 100644 --- a/sno/working_copy/base.py +++ b/sno/working_copy/base.py @@ -1,10 +1,9 @@ +from enum import Enum, auto import contextlib import functools import itertools import logging import time -from enum import Enum, auto - import click import pygit2 @@ -48,7 +47,8 @@ def from_path(cls, path, allow_invalid=False): f"Unrecognised working copy type: {path}\n" "Try one of:\n" " PATH.gpkg\n" - " postgresql://[HOST]/DBNAME/SCHEMA\n" + " postgresql://[HOST]/DBNAME/DBSCHEMA\n" + " mssql://[HOST]/DBNAME/DBSCHEMA" ) @property @@ -100,15 +100,10 @@ class WorkingCopy: self.sno_tables - sqlalchemy Table definitions for sno_track and sno_state tables. """ - SNO_WORKINGCOPY_PATH = "sno.workingcopy.path" + WORKING_COPY_TYPE_NAME = "AbstractBaseClass" + URI_SCHEME = None - @property - @functools.lru_cache(maxsize=1) - def DB_SCHEMA(self): - """Escaped, dialect-specific name of the database-schema owned by this working copy (if any).""" - if self.db_schema is None: - raise RuntimeError("No schema to escape.") - return self.preparer.format_schema(self.db_schema) + SNO_WORKINGCOPY_PATH = "sno.workingcopy.path" @property @functools.lru_cache(maxsize=1) @@ -261,7 +256,16 @@ def write_config(cls, repo, path=None, bare=False): repo_cfg[path_key] = str(path) @classmethod - def check_valid_creation_path(cls, workdir_path, wc_path): + def subclass_from_path(cls, wc_path): + wct = WorkingCopyType.from_path(wc_path) + if wct.class_ is cls: + raise RuntimeError( + f"No subclass found - don't call subclass_from_path on concrete implementation {cls}." + ) + return wct.class_ + + @classmethod + def check_valid_creation_path(cls, wc_path, workdir_path=None): """ Given a user-supplied string describing where to put the working copy, ensures it is a valid location, and nothing already exists there that prevents us from creating it. Raises InvalidOperation if it is not. @@ -269,20 +273,16 @@ def check_valid_creation_path(cls, workdir_path, wc_path): """ if not wc_path: wc_path = cls.default_path(workdir_path) - WorkingCopyType.from_path(wc_path).class_.check_valid_creation_path( - workdir_path, wc_path - ) + cls.subclass_from_path(wc_path).check_valid_creation_path(wc_path, workdir_path) @classmethod - def check_valid_path(cls, workdir_path, wc_path): + def check_valid_path(cls, wc_path, workdir_path=None): """ Given a user-supplied string describing where to put the working copy, ensures it is a valid location, and nothing already exists there that prevents us from creating it. Raises InvalidOperation if it is not. Doesn't check if we have permissions to create a working copy there. """ - WorkingCopyType.from_path(wc_path).class_.check_valid_path( - workdir_path, wc_path - ) + cls.subclass_from_path(wc_path).check_valid_path(wc_path, workdir_path) def check_valid_state(self): if self.is_created(): @@ -304,19 +304,39 @@ def default_path(cls, workdir_path): return f"{stem}.gpkg" @classmethod - def normalise_path(cls, repo, path): + def normalise_path(cls, repo, wc_path): """If the path is in a non-standard form, normalise it to the equivalent standard form.""" - return WorkingCopyType.from_path(path).class_.normalise_path(repo, path) + return cls.subclass_from_path(wc_path).normalise_path(repo, wc_path) @contextlib.contextmanager def session(self, bulk=0): """ - Context manager for DB sessions, yields a session object inside a transaction - Calling again yields the _same_ session, the transaction/etc only happen in the outer one. + Context manager for GeoPackage DB sessions, yields a connection object inside a transaction - @bulk controls bulk-loading operating mode - subject to change. See ./gpkg.py + Calling again yields the _same_ session, the transaction/etc only happen in the outer one. """ - raise NotImplementedError() + L = logging.getLogger(f"{self.__class__.__qualname__}.session") + + if hasattr(self, "_session"): + # Inner call - reuse existing session. + L.debug("session: existing...") + yield self._session + L.debug("session: existing/done") + return + + L.debug("session: new...") + self._session = self.sessionmaker() + try: + # TODO - use tidier syntax for opening transactions from sqlalchemy. + yield self._session + self._session.commit() + except Exception: + self._session.rollback() + raise + finally: + self._session.close() + del self._session + L.debug("session: new/done") def is_created(self): """ diff --git a/sno/working_copy/db_server.py b/sno/working_copy/db_server.py new file mode 100644 index 000000000..7a29c19bd --- /dev/null +++ b/sno/working_copy/db_server.py @@ -0,0 +1,101 @@ +import functools +import re +from urllib.parse import urlsplit, urlunsplit + +import click + +from .base import WorkingCopy +from sno.exceptions import InvalidOperation + + +class DatabaseServer_WorkingCopy(WorkingCopy): + """Functionality common to working copies that connect to a database server.""" + + # Subclasses should override with the DB server URI scheme eg "postgresql". + URI_SCHEME = None + + @classmethod + def check_valid_creation_path(cls, wc_path, workdir_path=None): + cls.check_valid_path(wc_path, workdir_path) + + working_copy = cls(None, wc_path) + if working_copy.has_data(): + db_schema = working_copy.db_schema + container_text = f"schema '{db_schema}'" if db_schema else "working copy" + raise InvalidOperation( + f"Error creating {cls.WORKING_COPY_TYPE_NAME} working copy at {wc_path} - " + f"non-empty {container_text} already exists" + ) + + @classmethod + def check_valid_path(cls, wc_path, workdir_path=None): + cls.check_valid_db_uri(wc_path, workdir_path) + + @classmethod + def normalise_path(cls, repo, wc_path): + return wc_path + + @classmethod + def check_valid_db_uri(cls, db_uri, workdir_path=None): + """ + For working copies that connect to a database - checks the given URI is in the required form: + >>> URI_SCHEME::[HOST]/DBNAME/DBSCHEMA + """ + url = urlsplit(db_uri) + + if url.scheme != cls.URI_SCHEME: + raise click.UsageError( + f"Invalid {cls.WORKING_COPY_TYPE_NAME} URI - " + f"Expecting URI in form: {cls.URI_SCHEME}://[HOST]/DBNAME/DBSCHEMA" + ) + + url_path = url.path + path_parts = url_path[1:].split("/", 3) if url_path else [] + + suggestion_message = "" + if len(path_parts) == 1 and workdir_path is not None: + suggested_path = f"/{path_parts[0]}/{cls.default_schema(workdir_path)}" + suggested_uri = urlunsplit( + [url.scheme, url.netloc, suggested_path, url.query, ""] + ) + suggestion_message = f"\nFor example: {suggested_uri}" + + if len(path_parts) != 2: + raise click.UsageError( + f"Invalid {cls.WORKING_COPY_TYPE_NAME} URI - URI requires both database name and database schema:\n" + f"Expecting URI in form: {cls.URI_SCHEME}://[HOST]/DBNAME/DBSCHEMA" + + suggestion_message + ) + + @classmethod + def _separate_db_schema(cls, db_uri): + """ + Removes the DBSCHEMA part off the end of a uri in the form URI_SCHEME::[HOST]/DBNAME/DBSCHEMA - + and returns the URI and the DBSCHEMA separately. + Useful since generally, URI_SCHEME::[HOST]/DBNAME is what is needed to connect to the database, + and then DBSCHEMA must be specified in each query. + """ + url = urlsplit(db_uri) + url_path = url.path + path_parts = url_path[1:].split("/", 3) if url_path else [] + assert len(path_parts) == 2 + url_path = "/" + path_parts[0] + db_schema = path_parts[1] + return urlunsplit([url.scheme, url.netloc, url_path, url.query, ""]), db_schema + + @classmethod + def default_db_schema(cls, workdir_path): + """Returns a suitable default database schema - named after the folder this Sno repo is in.""" + stem = workdir_path.stem + schema = re.sub("[^a-z0-9]+", "_", stem.lower()) + "_sno" + if schema[0].isdigit(): + schema = "_" + schema + return schema + + @property + @functools.lru_cache(maxsize=1) + def DB_SCHEMA(self): + """Escaped, dialect-specific name of the database-schema owned by this working copy (if any).""" + if self.db_schema is None: + raise RuntimeError("No schema to escape.") + return self.preparer.format_schema(self.db_schema) diff --git a/sno/working_copy/gpkg.py b/sno/working_copy/gpkg.py index 9bcbb00dd..12de00dfa 100644 --- a/sno/working_copy/gpkg.py +++ b/sno/working_copy/gpkg.py @@ -26,6 +26,15 @@ class WorkingCopy_GPKG(WorkingCopy): + """ + GPKG working copy implementation. + + Requirements: + 1. Can read and write to the filesystem at the given path. + """ + + WORKING_COPY_TYPE_NAME = "GPKG" + def __init__(self, repo, path): self.repo = repo self.path = path @@ -37,34 +46,34 @@ def __init__(self, repo, path): self.sno_tables = GpkgSnoTables @classmethod - def check_valid_creation_path(cls, workdir_path, path): - cls.check_valid_path(workdir_path, path) + def check_valid_creation_path(cls, wc_path, workdir_path=None): + cls.check_valid_path(wc_path, workdir_path) - gpkg_path = (workdir_path / path).resolve() + gpkg_path = (workdir_path / wc_path).resolve() if gpkg_path.exists(): desc = "path" if gpkg_path.is_dir() else "GPKG file" raise InvalidOperation( - f"Error creating GPKG working copy at {path} - {desc} already exists" + f"Error creating GPKG working copy at {wc_path} - {desc} already exists" ) @classmethod - def check_valid_path(cls, workdir_path, path): - if not str(path).endswith(".gpkg"): - suggested_path = f"{os.path.splitext(str(path))[0]}.gpkg" + def check_valid_path(cls, wc_path, workdir_path=None): + if not str(wc_path).endswith(".gpkg"): + suggested_path = f"{os.path.splitext(str(wc_path))[0]}.gpkg" raise click.UsageError( f"Invalid GPKG path - expected .gpkg suffix, eg {suggested_path}" ) @classmethod - def normalise_path(cls, repo, path): + def normalise_path(cls, repo, wc_path): """Rewrites a relative path (relative to the current directory) as relative to the repo.workdir_path.""" - path = Path(path) - if not path.is_absolute(): + wc_path = Path(wc_path) + if not wc_path.is_absolute(): try: - return str(path.resolve().relative_to(repo.workdir_path.resolve())) + return str(wc_path.resolve().relative_to(repo.workdir_path.resolve())) except ValueError: pass - return str(path) + return str(wc_path) @property def full_path(self): @@ -82,7 +91,7 @@ def _insert_or_replace_into_dataset(self, dataset): def _table_def_for_column_schema(self, col, dataset): if col.data_type == "geometry": - # This user-defined Geography type adapts WKB to SQL Server's native geography type. + # This user-defined GeometryType normalises GPKG geometry to the Sno V2 GPKG geometry. return sqlalchemy.column(col.name, GeometryType) else: # Don't need to specify type information for other columns at present, since we just pass through the values. @@ -119,37 +128,36 @@ def session(self, bulk=0): # - do something consistent and safe from then on. if hasattr(self, "_session"): - # inner - reuse + # Inner call - reuse existing session. L.debug(f"session(bulk={bulk}): existing...") yield self._session L.debug(f"session(bulk={bulk}): existing/done") + return - else: - L.debug(f"session(bulk={bulk}): new...") + # Outer call - create new session: + L.debug(f"session(bulk={bulk}): new...") + self._session = self.sessionmaker() - try: - self._session = self.sessionmaker() - - if bulk: - self._session.execute("PRAGMA synchronous = OFF;") - self._session.execute( - "PRAGMA cache_size = -1048576;" - ) # -KiB => 1GiB - if bulk >= 2: - self._session.execute("PRAGMA journal_mode = MEMORY;") - self._session.execute("PRAGMA locking_mode = EXCLUSIVE;") - - # TODO - use tidier syntax for opening transactions from sqlalchemy. - self._session.execute("BEGIN TRANSACTION;") - yield self._session - self._session.commit() - except Exception: - self._session.rollback() - raise - finally: - self._session.close() - del self._session - L.debug(f"session(bulk={bulk}): new/done") + try: + if bulk: + self._session.execute("PRAGMA synchronous = OFF;") + self._session.execute("PRAGMA cache_size = -1048576;") # -KiB => 1GiB + if bulk >= 2: + self._session.execute("PRAGMA journal_mode = MEMORY;") + self._session.execute("PRAGMA locking_mode = EXCLUSIVE;") + + # TODO - use tidier syntax for opening transactions from sqlalchemy. + self._session.execute("BEGIN TRANSACTION;") + yield self._session + self._session.commit() + + except Exception: + self._session.rollback() + raise + finally: + self._session.close() + del self._session + L.debug(f"session(bulk={bulk}): new/done") def delete(self, keep_db_schema_if_possible=False): """Delete the working copy files.""" diff --git a/sno/working_copy/postgis.py b/sno/working_copy/postgis.py index 611bb63ee..3a633a6b9 100644 --- a/sno/working_copy/postgis.py +++ b/sno/working_copy/postgis.py @@ -1,37 +1,36 @@ import contextlib import logging -import re import time -from urllib.parse import urlsplit, urlunsplit +from urllib.parse import urlsplit -import click from sqlalchemy.dialects.postgresql import insert as postgresql_insert from sqlalchemy.sql.compiler import IdentifierPreparer from sqlalchemy.orm import sessionmaker -from .base import WorkingCopy from . import postgis_adapter +from .db_server import DatabaseServer_WorkingCopy from .table_defs import PostgisSnoTables from sno import crs_util -from sno.exceptions import InvalidOperation from sno.schema import Schema from sno.sqlalchemy import postgis_engine -""" -* database needs to exist -* database needs to have postgis enabled -* database user needs to be able to: - 1. create 'sno' schema & tables - 2. create & alter tables in the default (or specified) schema - 3. create triggers -""" +class WorkingCopy_Postgis(DatabaseServer_WorkingCopy): + """ + PosttGIS working copy implementation. -L = logging.getLogger("sno.working_copy.postgis") + Requirements: + 1. Database needs to exist + 2. PostGIS needs to be enabled (unless no geometry is used). + 3. Database user needs to be able to: + - Create the specified schema (unless it already exists). + - Create, delete and alter tables and triggers in the speficied schema. + """ + WORKING_COPY_TYPE_NAME = "PostGIS" + URI_SCHEME = "postgresql" -class WorkingCopy_Postgis(WorkingCopy): def __init__(self, repo, uri): """ uri: connection string of the form postgresql://[user[:password]@][netloc][:port][/dbname/schema][?param1=value1&...] @@ -42,75 +41,18 @@ def __init__(self, repo, uri): self.uri = uri self.path = uri - url = urlsplit(uri) + self.check_valid_db_uri(uri) + self.db_uri, self.db_schema = self._separate_db_schema(uri) - if url.scheme != "postgresql": - raise ValueError("Expecting postgresql://") - - url_path = url.path - path_parts = url_path[1:].split("/", 3) if url_path else [] - if len(path_parts) != 2: - raise ValueError("Expecting postgresql://[HOST]/DBNAME/SCHEMA") - url_path = f"/{path_parts[0]}" - self.db_schema = path_parts[1] - - # Rebuild DB URL suitable for postgres - self.dburl = urlunsplit([url.scheme, url.netloc, url_path, url.query, ""]) - self.engine = postgis_engine(self.dburl) + self.engine = postgis_engine(self.db_uri) self.sessionmaker = sessionmaker(bind=self.engine) self.preparer = IdentifierPreparer(self.engine.dialect) self.sno_tables = PostgisSnoTables(self.db_schema) @classmethod - def check_valid_creation_path(cls, workdir_path, path): - cls.check_valid_path(workdir_path, path) - postgis_wc = cls(None, path) - - # Less strict on Postgis - we are okay with the schema being already created, so long as its empty. - if postgis_wc.has_data(): - raise InvalidOperation( - f"Error creating Postgis working copy at {path} - non-empty schema already exists" - ) - - @classmethod - def check_valid_path(cls, workdir_path, path): - url = urlsplit(path) - - if url.scheme != "postgresql": - raise click.UsageError( - "Invalid postgres URI - Expecting URI in form: postgresql://[HOST]/DBNAME/SCHEMA" - ) - - url_path = url.path - path_parts = url_path[1:].split("/", 3) if url_path else [] - - suggestion_message = "" - if len(path_parts) == 1 and workdir_path is not None: - suggested_path = f"/{path_parts[0]}/{cls.default_schema(workdir_path)}" - suggested_uri = urlunsplit( - [url.scheme, url.netloc, suggested_path, url.query, ""] - ) - suggestion_message = f"\nFor example: {suggested_uri}" - - if len(path_parts) != 2: - raise click.UsageError( - "Invalid postgres URI - postgis working copy requires both dbname and schema:\n" - "Expecting URI in form: postgresql://[HOST]/DBNAME/SCHEMA" - + suggestion_message - ) - - @classmethod - def normalise_path(cls, repo, path): - return path - - @classmethod - def default_schema(cls, workdir_path): - stem = workdir_path.stem - schema = re.sub("[^a-z0-9]+", "_", stem.lower()) + "_sno" - if schema[0].isdigit(): - schema = "_" + schema - return schema + def check_valid_path(cls, wc_path, workdir_path=None): + cls.check_valid_db_uri(wc_path, workdir_path) def __str__(self): p = urlsplit(self.uri) @@ -124,44 +66,11 @@ def __str__(self): p._replace(netloc=nl) return p.geturl() - @contextlib.contextmanager - def session(self, bulk=0): - """ - Context manager for GeoPackage DB sessions, yields a connection object inside a transaction - - Calling again yields the _same_ connection, the transaction/etc only happen in the outer one. - """ - L = logging.getLogger(f"{self.__class__.__qualname__}.session") - - if hasattr(self, "_session"): - # inner - reuse - L.debug("session: existing...") - yield self._session - L.debug("session: existing/done") - - else: - L.debug("session: new...") - - try: - self._session = self.sessionmaker() - - # TODO - use tidier syntax for opening transactions from sqlalchemy. - self._session.execute("BEGIN TRANSACTION;") - yield self._session - self._session.commit() - except Exception: - self._session.rollback() - raise - finally: - self._session.close() - del self._session - L.debug("session: new/done") - def is_created(self): """ - Returns true if the postgres schema referred to by this working copy exists and + Returns true if the DB schema referred to by this working copy exists and contains at least one table. If it exists but is empty, it is treated as uncreated. - This is so the postgres schema can be created ahead of time before a repo is created + This is so the DB schema can be created ahead of time before a repo is created or configured, without it triggering code that checks for corrupted working copies. Note that it might not be initialised as a working copy - see self.is_initialised. """ @@ -177,7 +86,7 @@ def is_created(self): def is_initialised(self): """ - Returns true if the postgis working copy is initialised - + Returns true if the PostGIS working copy is initialised - the schema exists and has the necessary sno tables, _sno_state and _sno_track. """ with self.session() as sess: @@ -192,7 +101,7 @@ def is_initialised(self): def has_data(self): """ - Returns true if the postgis working copy seems to have user-created content already. + Returns true if the PostGIS working copy seems to have user-created content already. """ with self.session() as sess: count = sess.scalar( diff --git a/sno/working_copy/sqlserver.py b/sno/working_copy/sqlserver.py index ee4d8bc33..3870bfe34 100644 --- a/sno/working_copy/sqlserver.py +++ b/sno/working_copy/sqlserver.py @@ -1,10 +1,8 @@ import contextlib import logging -import re import time -from urllib.parse import urlsplit, urlunsplit +from urllib.parse import urlsplit -import click import sqlalchemy from sqlalchemy import literal_column from sqlalchemy.ext.compiler import compiles @@ -15,28 +13,28 @@ from sqlalchemy.sql.compiler import IdentifierPreparer from sqlalchemy.types import UserDefinedType -from .base import WorkingCopy from . import sqlserver_adapter +from .db_server import DatabaseServer_WorkingCopy from .table_defs import SqlServerSnoTables from sno import crs_util from sno.geometry import Geometry -from sno.exceptions import InvalidOperation from sno.sqlalchemy import sqlserver_engine -""" -* database needs to exist -* database needs to have postgis enabled -* database user needs to be able to: - 1. create 'sno' schema & tables - 2. create & alter tables in the default (or specified) schema - 3. create triggers -""" +class WorkingCopy_SqlServer(DatabaseServer_WorkingCopy): + """ + SQL Server working copy implementation. -L = logging.getLogger("sno.working_copy.postgis") + Requirements: + 1. Database needs to exist + 2. Database user needs to be able to: + - Create the specified schema (unless it already exists). + - Create, delete and alter tables and triggers in the speficied schema. + """ + WORKING_COPY_TYPE_NAME = "SQL Server" + URI_SCHEME = "mssql" -class WorkingCopy_SqlServer(WorkingCopy): def __init__(self, repo, uri): """ uri: connection string of the form mssql://[user[:password]@][netloc][:port][/dbname/schema][?param1=value1&...] @@ -47,78 +45,15 @@ def __init__(self, repo, uri): self.uri = uri self.path = uri - url = urlsplit(uri) - - if url.scheme != "mssql": - raise ValueError("Expecting mssql://") + self.check_valid_db_uri(uri) + self.db_uri, self.db_schema = self._separate_db_schema(uri) - url_path = url.path - path_parts = url_path[1:].split("/", 3) if url_path else [] - if len(path_parts) != 2: - raise ValueError("Expecting mssql://[HOST]/DBNAME/SCHEMA") - url_path = f"/{path_parts[0]}" - self.db_schema = path_parts[1] - - # Rebuild DB URL suitable for sqlserver_engine. - self.dburl = urlunsplit([url.scheme, url.netloc, url_path, url.query, ""]) - self.engine = sqlserver_engine(self.dburl) + self.engine = sqlserver_engine(self.db_uri) self.sessionmaker = sessionmaker(bind=self.engine) self.preparer = IdentifierPreparer(self.engine.dialect) self.sno_tables = SqlServerSnoTables(self.db_schema) - @classmethod - def check_valid_creation_path(cls, workdir_path, path): - # TODO - promote to superclass - cls.check_valid_path(workdir_path, path) - sqlserver_wc = cls(None, path) - - # We are okay with the schema being already created, so long as its empty. - if sqlserver_wc.has_data(): - raise InvalidOperation( - f"Error creating SQL Server working copy at {path} - non-empty schema already exists" - ) - - @classmethod - def check_valid_path(cls, workdir_path, path): - url = urlsplit(path) - - if url.scheme != "mssql": - raise click.UsageError( - "Invalid postgres URI - Expecting URI in form: mssql://[HOST]/DBNAME/SCHEMA" - ) - - url_path = url.path - path_parts = url_path[1:].split("/", 3) if url_path else [] - - suggestion_message = "" - if len(path_parts) == 1 and workdir_path is not None: - suggested_path = f"/{path_parts[0]}/{cls.default_schema(workdir_path)}" - suggested_uri = urlunsplit( - [url.scheme, url.netloc, suggested_path, url.query, ""] - ) - suggestion_message = f"\nFor example: {suggested_uri}" - - if len(path_parts) != 2: - raise click.UsageError( - "Invalid mssql URI - SQL Server working copy requires both dbname and schema:\n" - "Expecting URI in form: mssql://[HOST]/DBNAME/SCHEMA" - + suggestion_message - ) - - @classmethod - def normalise_path(cls, repo, path): - return path - - @classmethod - def default_schema(cls, workdir_path): - # TODO - promote to superclass - stem = workdir_path.stem - schema = re.sub("[^a-z0-9]+", "_", stem.lower()) + "_sno" - if schema[0].isdigit(): - schema = "_" + schema - return schema - def __str__(self): p = urlsplit(self.uri) if p.password is not None: @@ -131,38 +66,6 @@ def __str__(self): p._replace(netloc=nl) return p.geturl() - @contextlib.contextmanager - def session(self, bulk=0): - """ - Context manager for GeoPackage DB sessions, yields a connection object inside a transaction - - Calling again yields the _same_ connection, the transaction/etc only happen in the outer one. - """ - L = logging.getLogger(f"{self.__class__.__qualname__}.session") - - if hasattr(self, "_session"): - # inner - reuse - L.debug("session: existing...") - yield self._session - L.debug("session: existing/done") - - else: - L.debug("session: new...") - - try: - self._session = self.sessionmaker() - - # TODO - use tidier syntax for opening transactions from sqlalchemy. - yield self._session - self._session.commit() - except Exception: - self._session.rollback() - raise - finally: - self._session.close() - del self._session - L.debug("session: new/done") - def is_created(self): """ Returns true if the db schema referred to by this working copy exists and @@ -180,7 +83,7 @@ def is_created(self): def is_initialised(self): """ - Returns true if the postgis working copy is initialised - + Returns true if the SQL server working copy is initialised - the schema exists and has the necessary sno tables, _sno_state and _sno_track. """ with self.session() as sess: @@ -196,7 +99,7 @@ def is_initialised(self): def has_data(self): """ - Returns true if the postgis working copy seems to have user-created content already. + Returns true if the SQL server working copy seems to have user-created content already. """ with self.session() as sess: count = sess.scalar( @@ -245,7 +148,7 @@ def _create_table_for_dataset(self, sess, dataset): def _table_def_for_column_schema(self, col, dataset): if col.data_type == "geometry": - # This user-defined Geography type adapts WKB to SQL Server's native geography type. + # This user-defined GeographyType adapts Sno's GPKG geometry to SQL Server's native geography type. crs_name = col.extra_type_info.get("geometryCRS", None) crs_id = crs_util.get_identifier_int_from_dataset(dataset, crs_name) or 0 return sqlalchemy.column(col.name, GeographyType(crs_id)) @@ -282,7 +185,7 @@ def _insert_or_replace_state_table_tree(self, sess, tree_id): return r.rowcount def _write_meta(self, sess, dataset): - """Write the title (as a comment) and the CRS. Other metadata is not stored in a PostGIS WC.""" + """Write the title. Other metadata is not stored in a SQL Server WC.""" self._write_meta_title(sess, dataset) def _write_meta_title(self, sess, dataset): @@ -292,7 +195,8 @@ def _write_meta_title(self, sess, dataset): def delete_meta(self, dataset): """Delete any metadata that is only needed by this dataset.""" - pass # There is no metadata except for the spatial_ref_sys table. + # There is no metadata stored anywhere except the table itself. + pass def _create_spatial_index(self, sess, dataset): L = logging.getLogger(f"{self.__class__.__qualname__}._create_spatial_index") @@ -352,24 +256,6 @@ def _suspend_triggers(self, sess, dataset): def meta_items(self, dataset): with self.session() as sess: - table_info_sql = """ - SELECT - C.column_name, C.ordinal_position, C.data_type, C.udt_name, - C.character_maximum_length, C.numeric_precision, C.numeric_scale, - KCU.ordinal_position AS pk_ordinal_position, - upper(postgis_typmod_type(A.atttypmod)) AS geometry_type, - postgis_typmod_srid(A.atttypmod) AS geometry_srid - FROM information_schema.columns C - LEFT OUTER JOIN information_schema.key_column_usage KCU - ON (KCU.table_schema = C.table_schema) - AND (KCU.table_name = C.table_name) - AND (KCU.column_name = C.column_name) - LEFT OUTER JOIN pg_attribute A - ON (A.attname = C.column_name) - AND (A.attrelid = (C.table_schema || '.' || C.table_name)::regclass::oid) - WHERE C.table_schema=:table_schema AND C.table_name=:table_name - ORDER BY C.ordinal_position; - """ table_info_sql = """ SELECT C.column_name, C.ordinal_position, C.data_type, @@ -405,10 +291,6 @@ def try_align_schema_col(cls, old_col_dict, new_col_dict): old_type = old_col_dict["dataType"] new_type = new_col_dict["dataType"] - # Some types have to be approximated as other types in SQL Server. - if sqlserver_adapter.APPROXIMATED_TYPES.get(old_type) == new_type: - new_col_dict["dataType"] = new_type = old_type - # Geometry type loses its extra type info when roundtripped through SQL Server. if new_type == "geometry": new_col_dict["geometryType"] = old_col_dict.get("geometryType") diff --git a/sno/working_copy/sqlserver_adapter.py b/sno/working_copy/sqlserver_adapter.py index de64fe778..5246b5154 100644 --- a/sno/working_copy/sqlserver_adapter.py +++ b/sno/working_copy/sqlserver_adapter.py @@ -1,4 +1,3 @@ -from sno import crs_util from sno.schema import Schema, ColumnSchema from sqlalchemy.sql.compiler import IdentifierPreparer @@ -25,7 +24,7 @@ def quote(ident): 32: "int", 64: "bigint", }, - "interval": "nvarchar", + "interval": "text", "numeric": "numeric", "text": "nvarchar", "time": "time", @@ -47,7 +46,6 @@ def quote(ident): "decimal": "numeric", "geography": "geometry", "geometry": "geometry", - "interval": "interval", "numeric": "numeric", "nvarchar": "text", "text": "text", @@ -91,13 +89,6 @@ def v2_type_to_ms_type(column_schema, v2_obj): return ms_type_info.get(extra_type_info.get("size", 0)) ms_type = ms_type_info - if ms_type == "geometry": - geometry_type = extra_type_info.get("geometryType") - crs_name = extra_type_info.get("geometryCRS") - crs_id = None - if crs_name is not None: - crs_id = crs_util.get_identifier_int_from_dataset(v2_obj, crs_name) - return _v2_geometry_type_to_ms_type(geometry_type, crs_id) if ms_type in ("varchar", "nvarchar", "varbinary"): length = extra_type_info.get("length", None) @@ -116,33 +107,19 @@ def v2_type_to_ms_type(column_schema, v2_obj): return ms_type -def _v2_geometry_type_to_ms_type(geometry_type, crs_id): - if geometry_type is not None: - geometry_type = geometry_type.replace(" ", "") - - if geometry_type is not None and crs_id is not None: - return f"geometry({geometry_type},{crs_id})" - elif geometry_type is not None: - return f"geometry({geometry_type})" - else: - return "geometry" - - def sqlserver_to_v2_schema(ms_table_info, id_salt): - """Generate a V2 schema from the given postgis metadata tables.""" + """Generate a V2 schema from the given SQL server metadata.""" return Schema([_sqlserver_to_column_schema(col, id_salt) for col in ms_table_info]) def _sqlserver_to_column_schema(ms_col_info, id_salt): """ - Given the postgis column info for a particular column, and some extra context in - case it is a geometry column, converts it to a ColumnSchema. The extra context will - only be used if the given ms_col_info is the geometry column. + Given the MS column info for a particular column, converts it to a ColumnSchema. + Parameters: ms_col_info - info about a single column from ms_table_info. - ms_spatial_ref_sys - rows of the "spatial_ref_sys" table that are referenced by this dataset. id_salt - the UUIDs of the generated ColumnSchema are deterministic and depend on - the name and type of the column, and on this salt. + the name and type of the column, and on this salt. """ name = ms_col_info["column_name"] pk_index = ms_col_info["pk_ordinal_position"] diff --git a/sno/working_copy/table_defs.py b/sno/working_copy/table_defs.py index 5f8fbd2dc..ef500911c 100644 --- a/sno/working_copy/table_defs.py +++ b/sno/working_copy/table_defs.py @@ -104,9 +104,11 @@ def create_all(self, session): class SqlServerSnoTables(TableSet): """ - Tables for sno-specific metadata - PostGIS variant. + Tables for sno-specific metadata - SQL Server variant. Table names have a user-defined schema, and so unlike other table sets, we need to construct an instance with the appropriate schema. + Primary keys have to be NVARCHAR of a fixed maximum length - + if the total maximum length is too long, SQL Server cannot generate an index. """ def __init__(self, schema=None): diff --git a/tests/conftest.py b/tests/conftest.py index c87c0374d..d060032a4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -932,7 +932,7 @@ def sqlserver_db(): """ Using docker, you can run a SQL Server test - such as those in test_working_copy_sqlserver - as follows: docker run -it --rm -d -p 11433:1433 -e ACCEPT_EULA=Y -e 'SA_PASSWORD=Sql(server)' mcr.microsoft.com/mssql/server - SNO_SQLSERVER_URL='mssql://sa:Sql(server)@127.0.0.1:11433/master' spytest -k sqlserver --pdb -vvs + SNO_SQLSERVER_URL='mssql://sa:Sql(server)@127.0.0.1:11433/master' pytest -k sqlserver --pdb -vvs """ if "SNO_SQLSERVER_URL" not in os.environ: raise pytest.skip( @@ -940,7 +940,7 @@ def sqlserver_db(): ) engine = sqlserver_engine(os.environ["SNO_SQLSERVER_URL"]) with engine.connect() as conn: - # test connection and postgis support + # Test connection try: conn.execute("SELECT @@version;") except sqlalchemy.exc.DBAPIError: