From 41df41027362bfea82b87a5a6967d898a792bd5f Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Thu, 17 Aug 2023 07:25:23 -0400 Subject: [PATCH] refactor(snowflake): replace custom temp table ddl for memtables with `read_parquet` --- ibis/backends/snowflake/__init__.py | 44 +---------------------------- 1 file changed, 1 insertion(+), 43 deletions(-) diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index 94976d9aefd3..802cac6aa378 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -434,18 +434,9 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: import pyarrow.parquet as pq raw_name = op.name - table = self._quote(raw_name) - - current_db = self.current_database - current_schema = self.current_schema - ident = f"{self._quote(current_db)}.{self._quote(current_schema)}.{table}" with self.begin() as con: if con.exec_driver_sql(f"SHOW TABLES LIKE '{raw_name}'").scalar() is None: - # 1. create a temporary stage for holding parquet files - stage = util.gen_name("stage") - con.exec_driver_sql(f"CREATE TEMP STAGE {stage}") - tmpdir = tempfile.TemporaryDirectory() try: path = os.path.join(tmpdir.name, f"{raw_name}.parquet") @@ -454,44 +445,11 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: pq.write_table( op.data.to_pyarrow(schema=op.schema), path, compression="zstd" ) - - # 2. copy the parquet file into the stage - # - # disable the automatic compression to gzip because we've - # already compressed the data with zstd - # - # 99 is the limit on the number of threads use to upload data, - # who knows why? - con.exec_driver_sql( - f""" - PUT 'file://{path}' @{stage} - PARALLEL = {min((os.cpu_count() or 2) // 2, 99)} - AUTO_COMPRESS = FALSE - """ - ) + self.read_parquet(path, table_name=raw_name) finally: with contextlib.suppress(Exception): shutil.rmtree(tmpdir.name) - # 3. create a temporary table - schema = ", ".join( - f"{self._quote(col)} {SnowflakeType.to_string(typ) + ' NOT NULL' * (not typ.nullable)}" - for col, typ in op.schema.items() - ) - con.exec_driver_sql(f"CREATE TEMP TABLE {ident} ({schema})") - # 4. copy the data into the table - columns = op.schema.names - column_names = ", ".join(map(self._quote, columns)) - parquet_column_names = ", ".join(f"$1:{col}" for col in columns) - con.exec_driver_sql( - f""" - COPY INTO {ident} ({column_names}) - FROM (SELECT {parquet_column_names} FROM @{stage}) - FILE_FORMAT = (TYPE = PARQUET COMPRESSION = AUTO) - PURGE = TRUE - """ - ) - def _get_temp_view_definition( self, name: str, definition: sa.sql.compiler.Compiled ) -> str: