Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(backends): clean up resources produced by memtable #10055

Merged
merged 15 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import abc
import collections.abc
import contextlib
import functools
import importlib.metadata
import keyword
Expand Down Expand Up @@ -1116,13 +1117,27 @@
for memtable in expr.op().find(ops.InMemoryTable):
if not self._in_memory_table_exists(memtable.name):
self._register_in_memory_table(memtable)
weakref.finalize(
memtable, self._finalize_in_memory_table, memtable.name
)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
if self.supports_in_memory_tables:
raise NotImplementedError(
f"{self.name} must implement `_register_in_memory_table` to support in-memory tables"
)

def _finalize_in_memory_table(self, name: str) -> None:
"""Wrap `_finalize_memtable` to suppress exceptions."""
with contextlib.suppress(Exception):
self._finalize_memtable(name)

def _finalize_memtable(self, name: str) -> None:
if self.supports_in_memory_tables:

Check warning on line 1136 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1136

Added line #L1136 was not covered by tests
raise NotImplementedError(
f"{self.name} must implement `_finalize_memtable` to support in-memory tables"
)

def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
"""Backend-specific hooks to run before an expression is executed."""
self._register_udfs(expr)
Expand Down
11 changes: 11 additions & 0 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@
else:
return True

def _finalize_memtable(self, name: str) -> None:
session_dataset = self._session_dataset
table_id = sg.table(

Check warning on line 185 in ibis/backends/bigquery/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/__init__.py#L184-L185

Added lines #L184 - L185 were not covered by tests
name,
db=session_dataset.dataset_id,
catalog=session_dataset.project,
quoted=False,
)
drop_sql_stmt = sge.Drop(kind="TABLE", this=table_id, exists=True)
self.raw_sql(drop_sql_stmt)

Check warning on line 192 in ibis/backends/bigquery/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/__init__.py#L191-L192

Added lines #L191 - L192 were not covered by tests

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
session_dataset = self._session_dataset

Expand Down
13 changes: 5 additions & 8 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import urllib
import warnings
import weakref
from operator import itemgetter
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -160,12 +159,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog = "temp"

temp_memtable_view = None

if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -234,9 +230,6 @@ def create_table(
).sql(self.name)
)

if temp_memtable_view is not None:
self.con.unregister(temp_memtable_view)

return self.table(name, database=(catalog, database))

def table(
Expand Down Expand Up @@ -1620,11 +1613,15 @@ def _in_memory_table_exists(self, name: str) -> bool:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self.con.register(op.name, op.data.to_pyarrow(op.schema))

def _finalize_memtable(self, name: str) -> None:
# if we don't aggressively unregister tables duckdb will keep a
# reference to every memtable ever registered, even if there's no
# way for a user to access the operation anymore, resulting in a
# memory leak
weakref.finalize(op, self.con.unregister, op.name)
#
# we can't use drop_table, because self.con.register creates a view, so
# use the corresponding unregister method
self.con.unregister(name)

def _register_udfs(self, expr: ir.Expr) -> None:
con = self.con
Expand Down
29 changes: 12 additions & 17 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import atexit
import contextlib
import datetime
import re
Expand Down Expand Up @@ -42,7 +41,6 @@ class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema):
compiler = sc.exasol.compiler
supports_temporary_tables = False
supports_create_or_replace = False
supports_in_memory_tables = False
supports_python_udfs = False

@property
Expand Down Expand Up @@ -278,14 +276,15 @@ def process_item(item: Any):
with self._safe_raw_sql(create_stmt_sql):
if not df.empty:
self.con.ext.insert_multi(name, rows)
atexit.register(self._clean_up_tmp_table, ident)

def _clean_up_tmp_table(self, ident: sge.Identifier) -> None:
with self._safe_raw_sql(
sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
):
def _clean_up_tmp_table(self, name: str) -> None:
ident = sg.to_identifier(name, quoted=self.compiler.quoted)
sql = sge.Drop(kind="TABLE", this=ident, exists=True, cascade=True)
with self._safe_raw_sql(sql):
pass

_finalize_memtable = _clean_up_tmp_table

def create_table(
self,
name: str,
Expand Down Expand Up @@ -334,11 +333,9 @@ def create_table(

quoted = self.compiler.quoted

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -356,31 +353,29 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(temp_name, catalog=database, quoted=quoted)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
table_expr = sg.table(temp_name, catalog=database, quoted=quoted)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed these variable names because table = <not the memtable> causes it to get GC'd. I gave another example of this in the PR description.

target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)

create_stmt = sge.Create(kind="TABLE", this=target)

this = sg.table(name, catalog=database, quoted=quoted)
with self._safe_raw_sql(create_stmt):
if query is not None:
self.con.execute(
sge.Insert(this=table, expression=query).sql(self.name)
sge.Insert(this=table_expr, expression=query).sql(self.name)
)

if overwrite:
self.con.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
self.con.execute(
f"RENAME TABLE {table.sql(self.name)} TO {this.sql(self.name)}"
f"RENAME TABLE {table_expr.sql(self.name)} TO {this.sql(self.name)}"
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down
19 changes: 8 additions & 11 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,11 +625,9 @@ def create_table(
properties.append(sge.TemporaryProperty())
catalog, db = None, None

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand All @@ -647,19 +645,22 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=self.compiler.quoted
)
quoted = self.compiler.quoted
raw_table = sg.table(temp_name, catalog=catalog, db=db, quoted=False)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
target = sge.Schema(
this=sg.table(
"#" * temp + temp_name, catalog=catalog, db=db, quoted=quoted
),
expressions=schema.to_sqlglot(self.dialect),
)

create_stmt = sge.Create(
kind="TABLE",
this=target,
properties=sge.Properties(expressions=properties),
)

this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted)
this = sg.table(name, catalog=catalog, db=db, quoted=quoted)
raw_this = sg.table(name, catalog=catalog, db=db, quoted=False)
with self._safe_ddl(create_stmt) as cur:
if query is not None:
Expand Down Expand Up @@ -692,10 +693,6 @@ def create_table(
db = "dbo"

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=(catalog, db))

# preserve the input schema if it was provided
Expand Down
19 changes: 15 additions & 4 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,10 @@ def create_table(
if not schema:
schema = table.schema()

table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=schema.to_sqlglot(self.dialect))
table_expr = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
target = sge.Schema(
this=table_expr, expressions=schema.to_sqlglot(self.dialect)
)

create_stmt = sge.Create(
kind="TABLE",
Expand All @@ -437,15 +439,17 @@ def create_table(
this = sg.table(name, catalog=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=table, expression=query).sql(self.name)
insert_stmt = sge.Insert(this=table_expr, expression=query).sql(
self.name
)
cur.execute(insert_stmt)

if overwrite:
cur.execute(
sge.Drop(kind="TABLE", this=this, exists=True).sql(self.name)
)
cur.execute(
f"ALTER TABLE IF EXISTS {table.sql(self.name)} RENAME TO {this.sql(self.name)}"
f"ALTER TABLE IF EXISTS {table_expr.sql(self.name)} RENAME TO {this.sql(self.name)}"
)

if schema is None:
Expand Down Expand Up @@ -538,3 +542,10 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
raise
df = MySQLPandasData.convert_table(df, schema)
return df

def _finalize_memtable(self, name: str) -> None:
"""No-op.

Executing **any** SQL in a finalizer causes the underlying connection
socket to be set to `None`. It is unclear why this happens.
"""
23 changes: 10 additions & 13 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import atexit
import contextlib
import re
import warnings
Expand Down Expand Up @@ -419,11 +418,9 @@ def create_table(
if temp:
properties.append(sge.TemporaryProperty())

temp_memtable_view = None
if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
temp_memtable_view = table.op().name
else:
table = obj

Expand Down Expand Up @@ -468,10 +465,6 @@ def create_table(
)

if schema is None:
# Clean up temporary memtable if we've created one
# for in-memory reads
if temp_memtable_view is not None:
self.drop_table(temp_memtable_view)
return self.table(name, database=database)

# preserve the input schema if it was provided
Expand Down Expand Up @@ -527,8 +520,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
insert_stmt, list(data.iloc[start:end].itertuples(index=False))
)

atexit.register(self._clean_up_tmp_table, name)

def _get_schema_using_query(self, query: str) -> sch.Schema:
name = util.gen_name("oracle_metadata")
dialect = self.name
Expand Down Expand Up @@ -608,6 +599,13 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
return OraclePandasData.convert_table(df, schema)

def _clean_up_tmp_table(self, name: str) -> None:
dialect = self.dialect

ident = sg.to_identifier(name, quoted=self.compiler.quoted)

truncate = sge.TruncateTable(expressions=[ident]).sql(dialect)
drop = sge.Drop(kind="TABLE", this=ident).sql(dialect)

with self.begin() as bind:
# global temporary tables cannot be dropped without first truncating them
#
Expand All @@ -616,9 +614,8 @@ def _clean_up_tmp_table(self, name: str) -> None:
# ignore DatabaseError exceptions because the table may not exist
# because it's already been deleted
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'TRUNCATE TABLE "{name}"')
bind.execute(truncate)
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'DROP TABLE "{name}"')
bind.execute(drop)

def _drop_cached_table(self, name):
self._clean_up_tmp_table(name)
_finalize_memtable = _drop_cached_table = _clean_up_tmp_table
3 changes: 3 additions & 0 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def execute(self, query, params=None, limit="default", **kwargs):
def _create_cached_table(self, name, expr):
return self.create_table(name, expr.execute())

def _finalize_memtable(self, name: str) -> None:
"""No-op, let Python handle clean up."""


@lazy_singledispatch
def _convert_object(obj: Any, _conn):
Expand Down
3 changes: 3 additions & 0 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def _in_memory_table_exists(self, name: str) -> bool:
def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
self._add_table(op.name, op.data.to_polars(op.schema).lazy())

def _finalize_memtable(self, name: str) -> None:
self.drop_table(name, force=True)

@deprecated(
as_of="9.1",
instead="use the explicit `read_*` method for the filetype you are trying to read, e.g., read_parquet, read_csv, etc.",
Expand Down
Loading