diff --git a/.github/workflows/python-test.yml b/.github/workflows/python-test.yml new file mode 100644 index 0000000..9dcaf28 --- /dev/null +++ b/.github/workflows/python-test.yml @@ -0,0 +1,41 @@ +name: Python Test + +on: + push: + branches: ["main"] + paths-ignore: ["README.md", "docs", ".github"] + pull_request: + branches: ["main"] + paths-ignore: ["README.md", "docs", ".github"] + +jobs: + build: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + python-version: ["3.11"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup Rust + uses: actions-rust-lang/setup-rust-toolchain@v1 + - name: Install tooling dependencies + run: | + python -m pip install --upgrade pip + pip install maturin + - name: Install Dependencies + run: | + pip install pytest polars pyarrow pytest-asyncio pyright python-dotenv docker pyright cffi + - name: Install Project + run: maturin develop + - name: pytest + shell: bash + run: pytest + - name: Pyright + run: poetry run pyright . diff --git a/lakeapi2sql/sql_connection.py b/lakeapi2sql/sql_connection.py index 2bcbf5a..849f5b8 100644 --- a/lakeapi2sql/sql_connection.py +++ b/lakeapi2sql/sql_connection.py @@ -1,22 +1,36 @@ import lakeapi2sql._lowlevel as lvd from lakeapi2sql.utils import prepare_connection_string +from typing import TypedDict + + +class TdsColumn(TypedDict): + name: str + column_type: str + + +class TdsResult(TypedDict): + columns: list[TdsColumn] + rows: list[dict] class TdsConnection: def __init__(self, connection_string: str, aad_token: str | None = None) -> None: - connection_string, aad_token = await prepare_connection_string(connection_string, aad_token) self._connection_string = connection_string self._aad_token = aad_token async def __aenter__(self) -> "TdsConnection": - self._connection = await lvd.connect_sql(self.connection_string, self.aad_token) + connection_string, aad_token = await prepare_connection_string(self._connection_string, self._aad_token) + + self._connection = await lvd.connect_sql(connection_string, aad_token) return self - async def __aexit__(self, exc_type, exc_value, traceback) -> None: + async def __aexit__(self, *args, **kwargs) -> None: pass - async def execute_sql(self, sql: str, arguments: list[str | int | float | bool | None]) -> list[int]: - return await lvd.execute_sql(self._connection, sql, arguments) + async def execute_sql(self, sql: str, arguments: list[str | int | float | bool | None] = None) -> list[int]: + return await lvd.execute_sql(self._connection, sql, arguments or []) - async def execute_sql_with_result(self, sql: str, arguments: list[str | int | float | bool | None]) -> list[int]: - return await lvd.execute_sql_with_result(self._connection, sql, arguments) + async def execute_sql_with_result( + self, sql: str, arguments: list[str | int | float | bool | None] = None + ) -> TdsResult: + return await lvd.execute_sql_with_result(self._connection, sql, arguments or []) diff --git a/pyproject.toml b/pyproject.toml index 86d9a9c..e326963 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "lakeapi2sql" requires-python = ">=3.10" -version = "0.9.0" +version = "0.9.1" classifiers = [ "Programming Language :: Rust", "Programming Language :: Python :: Implementation :: CPython", diff --git a/src/lib.rs b/src/lib.rs index 5e50ee0..e78f356 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ use error::LakeApi2SqlError; use futures::{StreamExt, TryStreamExt}; use pyo3::exceptions::{PyConnectionError, PyIOError, PyTypeError}; use pyo3::prelude::*; -use pyo3::types::{PyDict, PyInt, PyList, PyString}; +use pyo3::types::{PyDict, PyInt, PyList, PyString, PyTuple}; mod arrow_convert; pub mod bulk_insert; pub mod connect; @@ -63,7 +63,7 @@ fn into_dict_result<'a>(py: Python<'a>, meta: Option, rows: Vec< let mut py_rows = PyList::new( py, rows.iter().map(|row| { - PyList::new( + PyTuple::new( py, row.cells() .map(|(c, val)| match val { @@ -244,7 +244,7 @@ impl ToSql for ValueWrap { fn to_exec_args(args: Vec<&PyAny>) -> Result, PyErr> { let mut res: Vec = Vec::new(); - for i in 0..args.len() - 1 { + for i in 0..args.len() { let x = args[i]; res.push(ValueWrap(if x.is_none() { Box::new(Option::::None) as Box @@ -280,27 +280,50 @@ fn execute_sql<'a>( list2 }); } + let nr_args = args.len(); let tds_args = to_exec_args(args)?; let mutex = conn.0.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { - let res = mutex - .clone() - .lock() - .await - .execute( - query, - tds_args - .iter() - .map(|x| x.0.borrow() as &dyn ToSql) - .collect::>() - .as_slice(), - ) - .await; + let res = if nr_args > 0 { + mutex + .clone() + .lock() + .await + .execute( + query, + tds_args + .iter() + .map(|x| x.0.borrow() as &dyn ToSql) + .collect::>() + .as_slice(), + ) + .await + .map(|x| x.rows_affected().to_owned()) + } else { + let arc = mutex.clone(); + let lock = arc.lock(); + let mut conn = lock.await; + let res = conn.simple_query(query).await; + match res { + Ok(mut stream) => { + let mut row_count: u64 = 0; + while let Some(item) = stream.try_next().await.map_err(|er| { + PyErr::new::(format!("Error executing: {er}")) + })? { + if let QueryItem::Row(_) = item { + row_count += 1; + } + } + Ok(vec![row_count]) + } + Err(a) => Err(a), + } + }; match res { Ok(re) => { - return Ok(into_list(re.rows_affected())); + return Ok(into_list(&re)); } Err(er) => Err(PyErr::new::(format!("Error executing: {er}"))), } diff --git a/test/test_insert.py b/test/test_insert.py deleted file mode 100644 index 98b67f5..0000000 --- a/test/test_insert.py +++ /dev/null @@ -1,28 +0,0 @@ -import pyodbc -import pytest -import lakeapi2sql.bulk_insert -import os -from dotenv import load_dotenv - -load_dotenv() -## in order to use sql express, be sure to enable tcp for sql express and to start sql browser service -constr = "DRIVER=ODBC Driver 17 for SQL Server;" + os.getenv("SQL_CON_STR", "Server=tcp:localhost\\SQLExpress;database=lakeapi2sqltest;encrypt=no;IntegratedSecurity=yes;TrustServerCertificate=yes") -constr_master = constr.replace("{db}", "master") -constr_db = constr.replace("{db}", "lakeapi2sqltest") - - -async def test_bulk_insert(): - load_dotenv() - print(constr_db) - #with pyodbc.connect(constr_db) as db: - # db.execute("drop table if exists ##customer;create table ##customer(user_key nvarchar(100), employee_id nvarchar(100), last_name nvarchar(100), first_name nvarchar(100), login nvarchar(100), login_with_domain nvarchar(100), email nvarchar(100), language_code nvarchar(5), is_active bit,phone_nr_business nvarchar(100), mobile_nr_business nvarchar(100), update_date datetime2)") - print("before http call") - res = await lakeapi2sql.bulk_insert.insert_http_arrow_stream_to_sql(os.getenv("SQL_CON_STR", "Server=tcp:localhost\\SQLExpress;database=lakeapi2sqltest;encrypt=no;IntegratedSecurity=yes;TrustServerCertificate=true"), "##customer", os.environ["LAKE_API_URL"], (os.environ["LAKE_API_USER"],os.environ["LAKE_API_PWD"])) - #db.execute("drop table if exists user_result") - #db.execute("SELECT * into user_result FROM ##user") - print(res) - -if __name__ == "__main__": - #pytest.main(["--capture=tee-sys"]) - import asyncio - asyncio.run(test_bulk_insert()) \ No newline at end of file diff --git a/test/test_insert_reader.py b/test/test_insert_reader.py deleted file mode 100644 index d17c5af..0000000 --- a/test/test_insert_reader.py +++ /dev/null @@ -1,48 +0,0 @@ -import pyarrow as pa -import pyodbc -import pytest -import lakeapi2sql.bulk_insert -import os -from dotenv import load_dotenv - -data = [pa.array([1, 2, 3, 4]), pa.array(["foo", "bar", "$ä,àE", None]), pa.array([True, None, False, True])] - - -batch = pa.record_batch(data, names=["f0", "f1", "f2"]) - -batchreader = pa.RecordBatchReader.from_batches(batch.schema, [batch]) -load_dotenv() -## in order to use sql express, be sure to enable tcp for sql express and to start sql browser service -constr = "DRIVER=ODBC Driver 17 for SQL Server;" + os.getenv( - "SQL_CON_STR", - "Server=tcp:localhost\\SQLExpress;database=lakeapi2sqltest;encrypt=no;IntegratedSecurity=yes;TrustServerCertificate=yes", -) -constr_master = constr.replace("{db}", "master") -constr_db = constr.replace("{db}", "lakeapi2sqltest") - - -async def test_bulk_insert(): - load_dotenv() - print(constr_db) - with pyodbc.connect(constr_db) as db: - db.execute("drop table if exists ##ft;create table ##ft(f0 bigint, f1 nvarchar(100), f2 bit)") - print("before insert") - res = await lakeapi2sql.bulk_insert.insert_record_batch_to_sql( - os.getenv( - "SQL_CON_STR", - "Server=tcp:localhost\\SQLExpress;database=lakeapi2sqltest;encrypt=no;IntegratedSecurity=yes;TrustServerCertificate=true", - ), - "##ft", - batchreader, - ["f0", "f1", "f2"], - ) - # db.execute("drop table if exists user_result") - # db.execute("SELECT * into user_result FROM ##user") - print(res) - - -if __name__ == "__main__": - # pytest.main(["--capture=tee-sys"]) - import asyncio - - asyncio.run(test_bulk_insert()) diff --git a/test_server/__init__.py b/test_server/__init__.py new file mode 100644 index 0000000..ea71cc3 --- /dev/null +++ b/test_server/__init__.py @@ -0,0 +1,50 @@ +from pathlib import Path +import docker +from docker.models.containers import Container +from time import sleep +from typing import cast +import docker.errors +import os + + +def _getenvs(): + envs = dict() + with open("test_server/sql_docker.env", "r") as f: + lines = f.readlines() + envs = { + item[0].strip(): item[1].strip() + for item in [line.split("=") for line in lines if len(line.strip()) > 0 and not line.startswith("#")] + } + return envs + + +def start_mssql_server() -> Container: + client = docker.from_env() # code taken from https://github.com/fsspec/adlfs/blob/main/adlfs/tests/conftest.py#L72 + sql_server: Container | None = None + try: + m = cast(Container, client.containers.get("test4sql_lakeapi2sql")) + if m.status == "running": + return m + else: + sql_server = m + except docker.errors.NotFound: + pass + + envs = _getenvs() + + if sql_server is None: + # using podman: podman run --env-file=TESTS/SQL_DOCKER.ENV --publish=1439:1433 --name=mssql1 chriseaton/adventureworks:light + # podman kill mssql1 + sql_server = client.containers.run( + "mcr.microsoft.com/mssql/server:2022-latest", + environment=envs, + detach=True, + name="test4sql_lakeapi2sql", + ports={"1433/tcp": "1444"}, + ) # type: ignore + assert sql_server is not None + sql_server.start() + print(sql_server.status) + sleep(15) + print("Successfully created sql container...") + return sql_server diff --git a/test_server/sql_docker.env b/test_server/sql_docker.env new file mode 100644 index 0000000..f35257e --- /dev/null +++ b/test_server/sql_docker.env @@ -0,0 +1,4 @@ +SA_PASSWORD=MyPass@word4tests +ACCEPT_EULA=Y +MSSQL_PID=Express +MSSQL_SA_PASSWORD=MyPass@word4tests \ No newline at end of file diff --git a/test/__init__.py b/tests/__init__.py similarity index 100% rename from test/__init__.py rename to tests/__init__.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..fb1b643 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,95 @@ +import pytest +import os +import logging +from dotenv import load_dotenv +import pytest_asyncio + +load_dotenv() + +logger = logging.getLogger(__name__) + + +def _build_connection_string(conn_or_dict: str | dict): + if isinstance(conn_or_dict, str): + return conn_or_dict + conn_str = ";".join([f"{k}={v}" for k, v in conn_or_dict.items()]) + return conn_str + + +class DB_Connection: + def __init__(self): + import logging + + logging.getLogger("lakeapi2sql").setLevel(logging.DEBUG) + logging.getLogger("tiberius").setLevel(logging.DEBUG) + import shutil + + if os.path.exists("tests/_data"): + shutil.rmtree("tests/_data") + os.makedirs("tests/_data", exist_ok=True) + + conn_str = _build_connection_string( + os.getenv("TDS_MASTER_CONN", None) + or { + "server": "127.0.0.1,1444", + "database": "master", + "ENCRYPT": "yes", + "TrustServerCertificate": "Yes", + "UID": "sa", + "PWD": "MyPass@word4tests", + "MultipleActiveResultSets": "True", + } + ) + self.conn_str_master = conn_str + from lakeapi2sql import TdsConnection + + self.conn = TdsConnection(conn_str) + + self.conn_str = conn_str.replace("database=master", "database=lakesql_test").replace( + "Database=master", "Database=lakesql_test" + ) + if "lakesql_test" not in self.conn_str: + raise ValueError("Database not created correctly") + + async def __aenter__(self): + await self.conn.__aenter__() + try: + await self.conn.execute_sql(" drop DATABASE if exists lakesql_test") + await self.conn.execute_sql("CREATE DATABASE lakesql_test") + except Exception as e: + logger.error("Error drop creating db", exc_info=e) + await self.conn.execute_sql("USE lakesql_test") + with open("tests/sqls/init.sql", encoding="utf-8-sig") as f: + sqls = f.read().replace("\r\n", "\n").split("\nGO\n") + for sql in sqls: + await self.conn.execute_sql(sql) + return self + + async def __aexit__(self, *args, **kwargs): + await self.conn.__aexit__(*args, **kwargs) + pass + + def new_connection(self): + from lakeapi2sql import TdsConnection + + return TdsConnection(self.conn_str) + + +@pytest.fixture(scope="session") +def spawn_sql(): + import test_server + import os + + if os.getenv("NO_SQL_SERVER", "0") == "1": + yield None + else: + sql_server = test_server.start_mssql_server() + yield sql_server + if os.getenv("KEEP_SQL_SERVER", "0") == "0": # can be handy during development + sql_server.stop() + + +@pytest_asyncio.fixture(scope="session") +async def connection(spawn_sql): + async with DB_Connection() as c: + yield c diff --git a/tests/data/delta-table/_delta_log/00000000000000000000.json b/tests/data/delta-table/_delta_log/00000000000000000000.json new file mode 100644 index 0000000..e18abb5 --- /dev/null +++ b/tests/data/delta-table/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":1}} +{"metaData":{"id":"0e807d97-2895-4830-a172-eb9f06e23944","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"num\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1698613084286,"configuration":{}}} +{"add":{"path":"part-00001-e2c7edd4-83e0-4c67-8190-0b6ba55bc405-c000.snappy.parquet","size":795,"partitionValues":{},"modificationTime":1698613084290,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"letter\":\"a\",\"num\":1},\"maxValues\":{\"num\":3,\"letter\":\"c\"},\"nullCount\":{\"letter\":0,\"num\":0}}"}} +{"commitInfo":{"timestamp":1698613084290,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.16.2"}} \ No newline at end of file diff --git a/tests/data/delta-table/_delta_log/00000000000000000001.json b/tests/data/delta-table/_delta_log/00000000000000000001.json new file mode 100644 index 0000000..629322e --- /dev/null +++ b/tests/data/delta-table/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-1a7ca5ae-c5c6-49e8-b3cc-5554600b8447-c000.snappy.parquet","size":795,"partitionValues":{},"modificationTime":1698613119696,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"num\":77,\"letter\":\"x\"},\"maxValues\":{\"letter\":\"z\",\"num\":99},\"nullCount\":{\"num\":0,\"letter\":0}}"}} +{"commitInfo":{"timestamp":1698613119696,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.16.2"}} \ No newline at end of file diff --git a/tests/data/delta-table/part-00000-1a7ca5ae-c5c6-49e8-b3cc-5554600b8447-c000.snappy.parquet b/tests/data/delta-table/part-00000-1a7ca5ae-c5c6-49e8-b3cc-5554600b8447-c000.snappy.parquet new file mode 100644 index 0000000..f15141a Binary files /dev/null and b/tests/data/delta-table/part-00000-1a7ca5ae-c5c6-49e8-b3cc-5554600b8447-c000.snappy.parquet differ diff --git a/tests/data/delta-table/part-00001-e2c7edd4-83e0-4c67-8190-0b6ba55bc405-c000.snappy.parquet b/tests/data/delta-table/part-00001-e2c7edd4-83e0-4c67-8190-0b6ba55bc405-c000.snappy.parquet new file mode 100644 index 0000000..11cb408 Binary files /dev/null and b/tests/data/delta-table/part-00001-e2c7edd4-83e0-4c67-8190-0b6ba55bc405-c000.snappy.parquet differ diff --git a/tests/data/faker/_delta_log/00000000000000000000.json b/tests/data/faker/_delta_log/00000000000000000000.json new file mode 100644 index 0000000..84c2d6d --- /dev/null +++ b/tests/data/faker/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}} +{"metaData":{"id":"1434707f-5f70-498a-841e-79fa6031481c","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"address\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"text\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nbr\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"inie\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"datetime_ntz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}},{\"name\":\"datetime_tz\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1714478487570,"configuration":{}}} +{"add":{"path":"part-00001-4cfc750a-f3ef-4e2b-b614-896fb16d93d2-c000.zstd.parquet","partitionValues":{},"size":1720640,"modificationTime":1714478487640,"dataChange":true,"stats":"{\"numRecords\":9999,\"minValues\":{\"date\":\"1970-01-06\",\"name\":\"Aaron Flores\",\"nbr\":-99999793004600.2,\"datetime_tz\":\"1970-01-03T08:53:11Z\",\"text\":\"A accusamus numquam ullam. Nihil placeat consequuntur corporis quod expedita beatae tempore. Asperiores vel similique laboriosam.\",\"inie\":0,\"datetime_ntz\":\"1970-01-04T00:47:45Z\",\"address\":\"000 Andrew Brooks Apt. 135\\nBernardbury, TX 69392\",\"id\":1},\"maxValues\":{\"id\":9999,\"address\":\"鹿児島県鴨川市戸島18丁目22番16号\",\"nbr\":99911876370940.1,\"date\":\"2024-04-29\",\"inie\":9997,\"name\":\"高橋 香織\",\"datetime_ntz\":\"2024-04-27T10:08:45Z\",\"datetime_tz\":\"2024-04-29T05:44:55Z\",\"text\":\"高い電池極端なそれデフォルトスペル自体残る。ベルベットサンプルダニ転倒保証金索引溝。\\n拡張ログ織る尊敬する。部隊評議会コンペ呼ぶ。目的人形楽しんで改善差別する。\\n探査省略厳しい再現するトーン意図。\\n運戦略的腐った本質的な敵舗装持つ。装置ヘアベルベットクロス副舗装。軸残るバーゲン保持する風景副。\"},\"nullCount\":{\"nbr\":0,\"inie\":0,\"date\":0,\"address\":0,\"text\":0,\"datetime_tz\":0,\"id\":0,\"datetime_ntz\":0,\"name\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1714478487644,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.17.1"}} \ No newline at end of file diff --git a/tests/data/faker/part-00001-4cfc750a-f3ef-4e2b-b614-896fb16d93d2-c000.zstd.parquet b/tests/data/faker/part-00001-4cfc750a-f3ef-4e2b-b614-896fb16d93d2-c000.zstd.parquet new file mode 100644 index 0000000..bb91c9a Binary files /dev/null and b/tests/data/faker/part-00001-4cfc750a-f3ef-4e2b-b614-896fb16d93d2-c000.zstd.parquet differ diff --git a/tests/data/user2/_delta_log/00000000000000000000.json b/tests/data/user2/_delta_log/00000000000000000000.json new file mode 100644 index 0000000..588c073 --- /dev/null +++ b/tests/data/user2/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}} +{"metaData":{"id":"468591ab-6c32-406e-9807-b88a52f7c08a","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"User_-_iD\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"FirstName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"LastName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Age\",\"type\":\"decimal(15,3)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"companyid\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"time_stamp\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"__timestamp\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}},{\"name\":\"__is_deleted\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"__is_full_load\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1713536760218,"configuration":{}}} +{"add":{"path":"part-00001-ab3f2a71-0260-4560-96f6-85e7dccbeb5a-c000.zstd.parquet","partitionValues":{},"size":3414,"modificationTime":1713536760231,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"companyid\":\"c1\",\"time_stamp\":2004,\"__timestamp\":\"2024-04-19T14:25:59.946667Z\",\"Age\":14.0,\"FirstName\":\"John\",\"__is_deleted\":false,\"User_-_iD\":1,\"__is_full_load\":true,\"LastName\":\"Anders\"},\"maxValues\":{\"Age\":24.0,\"User_-_iD\":3,\"LastName\":\"wayne\",\"time_stamp\":2006,\"__is_full_load\":true,\"companyid\":\"c1\",\"__timestamp\":\"2024-04-19T14:25:59.946667Z\",\"__is_deleted\":false,\"FirstName\":\"Petra\"},\"nullCount\":{\"Age\":0,\"companyid\":0,\"User_-_iD\":0,\"__timestamp\":0,\"FirstName\":0,\"time_stamp\":0,\"__is_full_load\":0,\"__is_deleted\":0,\"LastName\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1713536760232,"operation":"WRITE","operationParameters":{"mode":"Overwrite"},"clientVersion":"delta-rs.0.17.1"}} \ No newline at end of file diff --git a/tests/data/user2/_delta_log/00000000000000000001.json b/tests/data/user2/_delta_log/00000000000000000001.json new file mode 100644 index 0000000..ff05455 --- /dev/null +++ b/tests/data/user2/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-7e6179cf-46e2-4a0e-899f-969998adbc62-c000.zstd.parquet","partitionValues":{},"size":3455,"modificationTime":1713536764336,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"Age\":24.0,\"User_-_iD\":3,\"companyid\":\"c1\",\"time_stamp\":2013,\"LastName\":\"Meier\",\"__is_deleted\":false,\"__timestamp\":\"2024-04-19T14:26:03.890Z\",\"FirstName\":\"Heiri\",\"__is_full_load\":false},\"maxValues\":{\"User_-_iD\":5,\"__is_deleted\":false,\"__timestamp\":\"2024-04-19T14:26:03.890Z\",\"time_stamp\":2015,\"__is_full_load\":false,\"FirstName\":\"Petra\",\"Age\":27.98,\"LastName\":\"wayne-hösch\",\"companyid\":\"c2\"},\"nullCount\":{\"time_stamp\":0,\"FirstName\":0,\"User_-_iD\":0,\"__timestamp\":0,\"Age\":0,\"LastName\":0,\"__is_full_load\":0,\"__is_deleted\":0,\"companyid\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1713536764337,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.17.1"}} \ No newline at end of file diff --git a/tests/data/user2/part-00001-7e6179cf-46e2-4a0e-899f-969998adbc62-c000.zstd.parquet b/tests/data/user2/part-00001-7e6179cf-46e2-4a0e-899f-969998adbc62-c000.zstd.parquet new file mode 100644 index 0000000..a9825ba Binary files /dev/null and b/tests/data/user2/part-00001-7e6179cf-46e2-4a0e-899f-969998adbc62-c000.zstd.parquet differ diff --git a/tests/data/user2/part-00001-ab3f2a71-0260-4560-96f6-85e7dccbeb5a-c000.zstd.parquet b/tests/data/user2/part-00001-ab3f2a71-0260-4560-96f6-85e7dccbeb5a-c000.zstd.parquet new file mode 100644 index 0000000..2867e3e Binary files /dev/null and b/tests/data/user2/part-00001-ab3f2a71-0260-4560-96f6-85e7dccbeb5a-c000.zstd.parquet differ diff --git a/tests/sqls/init.sql b/tests/sqls/init.sql new file mode 100644 index 0000000..c75a3ac --- /dev/null +++ b/tests/sqls/init.sql @@ -0,0 +1,154 @@ +drop table if exists dbo.[user2$]; +drop table if exists dbo.[user3]; +drop table if exists dbo.[user3_]; +drop table if exists dbo.[user4]; +drop table if exists dbo.[user5]; +drop table if exists dbo.[user]; +drop table if exists dbo.[company]; +drop table if exists dbo.[company2]; +drop table if exists dbo.[company3]; +drop table if exists [long schema].[long table name]; +drop table if exists dbo.[log]; + +drop view if exists [long schema].[long table name_as_view]; +GO +create table dbo.[log] (id int primary key identity(1, 1), message nvarchar(max), [inserted_at] datetime not null default(getdate())); +create table dbo.[company]( + id varchar(10) collate Icelandic_100_CI_AI_SC primary key, + name varchar(100), + SysStartTime datetime2 GENERATED ALWAYS AS ROW START, + SysEndTime datetime2 GENERATED ALWAYS AS ROW + END, + PERIOD FOR SYSTEM_TIME(SysStartTime, SysEndTime) +); +create table dbo.[company2]( + id varchar(10) collate Icelandic_100_CI_AI_SC primary key, + name varchar(100), + SysStartTime datetime2 GENERATED ALWAYS AS ROW START, + SysEndTime datetime2 GENERATED ALWAYS AS ROW + END, + PERIOD FOR SYSTEM_TIME(SysStartTime, SysEndTime) +); +create table dbo.[company3]( + id varchar(10) collate Icelandic_100_CI_AI_SC primary key, + name varchar(100), + [Start] datetime2 GENERATED ALWAYS AS ROW START, + [End] datetime2 GENERATED ALWAYS AS ROW + END, + PERIOD FOR SYSTEM_TIME([Start], [End]) +); +insert into dbo.[company](id, name) +select 'c1', + 'The First company'; +insert into dbo.[company](id, name) +select 'c2', + 'The Second company '; +insert into dbo.[company3](id, name) +select id, name from dbo.[company]; +create table dbo.[user]( + [User - iD] bigint primary key identity(1, 1), + FirstName varchar(100), + LastName nvarchar(max), + Age decimal(15, 3), + companyid varchar(10) collate Icelandic_100_CI_AI_SC not null references dbo.company(id), + [time stamp] timestamp +); +create table dbo.[user2$]( + [User - iD] bigint primary key identity(1, 1), + FirstName varchar(100), + LastName nvarchar(max), + Age decimal(15, 3), + companyid varchar(10) collate Icelandic_100_CI_AI_SC not null references dbo.company(id), + [time stamp] timestamp +); +create table dbo.[user3]( + [User - iD] bigint primary key identity(1, 1), + FirstName varchar(100), + LastName nvarchar(max), + Age decimal(15, 3), + companyid varchar(10) collate Icelandic_100_CI_AI_SC not null references dbo.company(id), + [time stamp] timestamp +); +create table dbo.[user4]( + [User - iD] bigint primary key identity(1, 1), + FirstName varchar(100), + LastName nvarchar(max), + Age decimal(15, 3), + companyid varchar(10) collate Icelandic_100_CI_AI_SC not null references dbo.company(id), + [time stamp] timestamp +); +create table dbo.[user5]( + [User - iD] bigint primary key identity(1, 1), + FirstName varchar(100), + LastName nvarchar(max), + Age decimal(15, 3), + companyid varchar(10) collate Icelandic_100_CI_AI_SC not null references dbo.company(id), + [time stamp] timestamp +); +insert into dbo.[user](FirstName, LastName, Age, companyid) +select * +FROM ( + VALUES('John', 'Anders', 14, 'c1'), + ('Peter', 'Johniingham', 23, 'c1'), + ('Petra', 'wayne', 24, 'c1') + ) as x(fn, ln, a, ci); +; +insert into dbo.[user2$](FirstName, LastName, Age, companyid) +select FirstName, + LastName, + Age, + companyid +from dbo.[user]; +insert into dbo.[user3](FirstName, LastName, Age, companyid) +select FirstName, + LastName, + Age, + companyid +from dbo.[user]; +insert into dbo.[user4](FirstName, LastName, Age, companyid) +select FirstName, + LastName, + Age, + companyid +from dbo.[user]; +insert into dbo.[user5](FirstName, LastName, Age, companyid) +select FirstName, + LastName, + Age, + companyid +from dbo.[user]; +GO +IF NOT EXISTS( + Select * + from sys.schemas + where name = 'long schema' +) begin exec sp_executesql N'CREATE SCHEMA [long schema]' +end; +GO +IF NOT EXISTS( + Select * + from sys.schemas + where name = 'lake_import' +) begin exec sp_executesql N'CREATE SCHEMA lake_import' +end; +GO +CREATE TABLE [long schema].[long table name] ( + [long column name] int, + dt xml, + uid uniqueidentifier default newid(), + [date] date +); +GO +INSERT INTO [long schema].[long table name] ([long column name], dt, [date]) +SELECT 1, + 'text', + '2023-01-01' +union all +SELECT 2, + 'text 2345asdf', + '2024-01-01'; +; +insert into dbo.[log](message) +select 'The first log message'; +GO +create view [long schema].[long table name_as_view] as select * from [long schema].[long table name]; \ No newline at end of file diff --git a/tests/test_insert_simple.py b/tests/test_insert_simple.py new file mode 100644 index 0000000..eaad64c --- /dev/null +++ b/tests/test_insert_simple.py @@ -0,0 +1,33 @@ +from typing import TYPE_CHECKING +import pytest + +if TYPE_CHECKING: + from .conftest import DB_Connection + + +@pytest.mark.asyncio +async def test_insert_simple(connection: "DB_Connection"): + import pyarrow as pa + from lakeapi2sql.bulk_insert import insert_record_batch_to_sql + + data = [pa.array([1, 2, 3, 4]), pa.array(["foo", "bar", "$ä,àE", None]), pa.array([True, None, False, True])] + + batch = pa.record_batch(data, names=["f0", "f1", "f2"]) + + batchreader = pa.RecordBatchReader.from_batches(batch.schema, [batch]) + async with connection.new_connection() as con: + await con.execute_sql( + "drop table if exists dbo.test1;create table dbo.test1(f0 bigint, f1 nvarchar(100), f2 bit)" + ) + + await insert_record_batch_to_sql( + connection.conn_str, + "dbo.test1", + batchreader, + ["f0", "f1", "f2"], + ) + async with connection.new_connection() as con: + res = await con.execute_sql_with_result("select * from dbo.test1") + print(res["columns"]) + assert [c["name"] for c in res["columns"]] == ["f0", "f1", "f2"] + assert res["rows"] == [(1, "foo", True), (2, "bar", None), (3, "$ä,àE", False), (4, None, True)]