Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(caching): tie lifetime of cached tables to python refs #9477

Merged
merged 15 commits into from
Jul 2, 2024
Merged
1 change: 0 additions & 1 deletion conda/environment-arm64-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ dependencies:
# runtime dependencies
- python =3.10
- atpublic >=2.3
- bidict >=0.22.1
- black >=22.1.0,<25
- clickhouse-connect >=0.5.23
- dask >=2022.9.1
Expand Down
1 change: 0 additions & 1 deletion conda/environment-arm64.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ dependencies:
# runtime dependencies
- python >=3.10
- atpublic >=2.3
- bidict >=0.22.1
- black >=22.1.0,<25
- clickhouse-connect >=0.5.23
- dask >=2022.9.1
Expand Down
1 change: 0 additions & 1 deletion conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ dependencies:
# runtime dependencies
- apache-flink
- atpublic >=2.3
- bidict >=0.22.1
- black >=22.1.0,<25
- clickhouse-connect >=0.5.23
- dask >=2022.9.1
Expand Down
1 change: 0 additions & 1 deletion docs/posts/run-on-snowflake/index.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ session.sproc.register(
"snowflake-snowpark-python",
"toolz",
"atpublic",
"bidict",
"pyarrow",
"pandas",
"numpy",
Expand Down
7 changes: 3 additions & 4 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,8 +1228,7 @@ def _cached(self, expr: ir.Table):
"""
op = expr.op()
if (result := self._query_cache.get(op)) is None:
self._query_cache.store(expr)
result = self._query_cache[op]
result = self._query_cache.store(expr)
return ir.CachedTable(result)

def _release_cached(self, expr: ir.CachedTable) -> None:
Expand All @@ -1241,12 +1240,12 @@ def _release_cached(self, expr: ir.CachedTable) -> None:
Cached expression to release

"""
del self._query_cache[expr.op()]
self._query_cache.release(expr.op().name)

def _load_into_cache(self, name, expr):
raise NotImplementedError(self.name)

def _clean_up_cached_table(self, op):
def _clean_up_cached_table(self, name):
raise NotImplementedError(self.name)

def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str:
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,9 +1144,9 @@ def drop_view(
def _load_into_cache(self, name, expr):
self.create_table(name, expr, schema=expr.schema(), temp=True)

def _clean_up_cached_table(self, op):
def _clean_up_cached_table(self, name):
self.drop_table(
op.name,
name,
database=(self._session_dataset.project, self._session_dataset.dataset_id),
)

Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ def create_table(
def _load_into_cache(self, name, expr):
self.create_table(name, expr, schema=expr.schema(), temp=True)

def _clean_up_cached_table(self, op):
self.drop_table(op.name)
def _clean_up_cached_table(self, name):
self.drop_table(name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this can be deleted to just rely on the base sql backend implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, and works


def table(
self, name: str, schema: str | None = None, database: str | None = None
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,5 +627,5 @@
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f'DROP TABLE "{name}"')

def _clean_up_cached_table(self, op):
self._clean_up_tmp_table(op.name)
def _clean_up_cached_table(self, name):
self._clean_up_tmp_table(name)

Check warning on line 631 in ibis/backends/oracle/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/oracle/__init__.py#L631

Added line #L631 was not covered by tests
4 changes: 2 additions & 2 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@
def has_operation(cls, operation: type[ops.Value]) -> bool:
return operation in cls._get_operations()

def _clean_up_cached_table(self, op):
del self.dictionary[op.name]
def _clean_up_cached_table(self, name):
del self.dictionary[name]

Check warning on line 279 in ibis/backends/pandas/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pandas/__init__.py#L279

Added line #L279 was not covered by tests

def to_pyarrow(
self,
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@
def _load_into_cache(self, name, expr):
self.create_table(name, self.compile(expr).cache())

def _clean_up_cached_table(self, op):
self._remove_table(op.name)
def _clean_up_cached_table(self, name):
self._remove_table(name)

Check warning on line 564 in ibis/backends/polars/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/polars/__init__.py#L564

Added line #L564 was not covered by tests


@lazy_singledispatch
Expand Down
3 changes: 1 addition & 2 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,8 +638,7 @@ def _load_into_cache(self, name, expr):
# asked to, instead of when the session ends
self._cached_dataframes[name] = t

def _clean_up_cached_table(self, op):
name = op.name
def _clean_up_cached_table(self, name):
self._session.catalog.dropTempView(name)
t = self._cached_dataframes.pop(name)
assert t.is_cached
Expand Down
1 change: 0 additions & 1 deletion ibis/backends/snowflake/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ def ibis_sproc(session):
"snowflake-snowpark-python",
"toolz",
"atpublic",
"bidict",
"pyarrow",
"pandas",
"numpy",
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ def drop_view(
def _load_into_cache(self, name, expr):
self.create_table(name, expr, schema=expr.schema(), temp=True)

def _clean_up_cached_table(self, op):
self.drop_table(op.name)
def _clean_up_cached_table(self, name):
self.drop_table(name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to pass force=True to most of these drop_table calls (here and elsewhere) to not error if the table doesn't exist. What we care about is that the table is cleaned up, if some other mechanism already deleted it then I don't think that should be a user-facing error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def execute(
self,
Expand Down
111 changes: 49 additions & 62 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1334,31 +1334,6 @@
assert result == schema


@mark.notimpl(["datafusion", "flink", "impala", "trino", "druid"])
@mark.never(
["mssql"],
reason="mssql supports support temporary tables through naming conventions",
)
@mark.notimpl(["exasol"], reason="Exasol does not support temporary tables")
@pytest.mark.never(
["risingwave"],
raises=com.UnsupportedOperationError,
reason="Feature is not yet implemented: CREATE TEMPORARY TABLE",
)
def test_persist_expression_ref_count(backend, con, alltypes):
non_persisted_table = alltypes.mutate(test_column=ibis.literal("calculation"))
persisted_table = non_persisted_table.cache()

op = non_persisted_table.op()

# ref count is unaffected without a context manager
assert con._query_cache.refs[op] == 1
backend.assert_frame_equal(
non_persisted_table.to_pandas(), persisted_table.to_pandas()
)
assert con._query_cache.refs[op] == 1


@mark.notimpl(["datafusion", "flink", "impala", "trino", "druid"])
@mark.never(
["mssql"],
Expand Down Expand Up @@ -1391,14 +1366,16 @@
raises=com.UnsupportedOperationError,
reason="Feature is not yet implemented: CREATE TEMPORARY TABLE",
)
def test_persist_expression_contextmanager(backend, alltypes):
def test_persist_expression_contextmanager(backend, con, alltypes):
non_cached_table = alltypes.mutate(
test_column=ibis.literal("calculation"), other_column=ibis.literal("big calc")
)
with non_cached_table.cache() as cached_table:
backend.assert_frame_equal(
non_cached_table.to_pandas(), cached_table.to_pandas()
)
with pytest.warns(DeprecationWarning):
with non_cached_table.cache() as cached_table:
backend.assert_frame_equal(
non_cached_table.to_pandas(), cached_table.to_pandas()
)
assert non_cached_table.op() not in con._query_cache.cache

Check warning on line 1378 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1378

Added line #L1378 was not covered by tests


@mark.notimpl(["datafusion", "flink", "impala", "trino", "druid"])
Expand All @@ -1417,12 +1394,12 @@
test_column=ibis.literal("calculation"), other_column=ibis.literal("big calc 2")
)
op = non_cached_table.op()
with non_cached_table.cache() as cached_table:
backend.assert_frame_equal(
non_cached_table.to_pandas(), cached_table.to_pandas()
)
assert con._query_cache.refs[op] == 1
assert con._query_cache.refs[op] == 0
cached_table = non_cached_table.cache()
backend.assert_frame_equal(non_cached_table.to_pandas(), cached_table.to_pandas())

assert op in con._query_cache.cache
del cached_table
assert op not in con._query_cache.cache

Check warning on line 1402 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1400-L1402

Added lines #L1400 - L1402 were not covered by tests


@mark.notimpl(["datafusion", "flink", "impala", "trino", "druid"])
Expand All @@ -1441,29 +1418,28 @@
test_column=ibis.literal("calculation"), other_column=ibis.literal("big calc 2")
)
op = non_cached_table.op()
with non_cached_table.cache() as cached_table:
backend.assert_frame_equal(
non_cached_table.to_pandas(), cached_table.to_pandas()
)
cached_table = non_cached_table.cache()

name1 = cached_table.op().name
backend.assert_frame_equal(non_cached_table.to_pandas(), cached_table.to_pandas())

with non_cached_table.cache() as nested_cached_table:
name2 = nested_cached_table.op().name
assert not nested_cached_table.to_pandas().empty
name = cached_table.op().name
nested_cached_table = non_cached_table.cache()

Check warning on line 1426 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1425-L1426

Added lines #L1425 - L1426 were not covered by tests

# there are two refs to the uncached expression
assert con._query_cache.refs[op] == 2
# cached tables are identical and reusing the same op
assert cached_table.op() is nested_cached_table.op()

Check warning on line 1429 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1429

Added line #L1429 was not covered by tests
# table is cached
assert op in con._query_cache.cache

Check warning on line 1431 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1431

Added line #L1431 was not covered by tests

# one ref to the uncached expression was removed by the context manager
assert con._query_cache.refs[op] == 1
# deleting the first reference, leaves table in cache
del nested_cached_table
assert op in con._query_cache.cache

Check warning on line 1435 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1434-L1435

Added lines #L1434 - L1435 were not covered by tests

# no refs left after the outer context manager exits
assert con._query_cache.refs[op] == 0
# deleting the last reference, releases table from cache
del cached_table
assert op not in con._query_cache.cache

Check warning on line 1439 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1438-L1439

Added lines #L1438 - L1439 were not covered by tests

# assert that tables have been dropped
assert name1 not in con.list_tables()
assert name2 not in con.list_tables()
# assert that table has been dropped
assert name not in con.list_tables()

Check warning on line 1442 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1442

Added line #L1442 was not covered by tests


@mark.notimpl(["datafusion", "flink", "impala", "trino", "druid"])
Expand All @@ -1477,13 +1453,19 @@
raises=com.UnsupportedOperationError,
reason="Feature is not yet implemented: CREATE TEMPORARY TABLE",
)
def test_persist_expression_repeated_cache(alltypes):
def test_persist_expression_repeated_cache(alltypes, con):
non_cached_table = alltypes.mutate(
test_column=ibis.literal("calculation"), other_column=ibis.literal("big calc 2")
)
with non_cached_table.cache() as cached_table:
with cached_table.cache() as nested_cached_table:
assert not nested_cached_table.to_pandas().empty
cached_table = non_cached_table.cache()
nested_cached_table = cached_table.cache()
name = cached_table.op().name

Check warning on line 1462 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1462

Added line #L1462 was not covered by tests

assert not nested_cached_table.to_pandas().empty

Check warning on line 1464 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1464

Added line #L1464 was not covered by tests

del nested_cached_table, cached_table

Check warning on line 1466 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1466

Added line #L1466 was not covered by tests

assert name not in con.list_tables()

Check warning on line 1468 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1468

Added line #L1468 was not covered by tests


@mark.notimpl(["datafusion", "flink", "impala", "trino", "druid"])
Expand All @@ -1502,12 +1484,17 @@
test_column=ibis.literal("calculation"), other_column=ibis.literal("big calc 3")
)
cached_table = non_cached_table.cache()
cached_table.release()
assert con._query_cache.refs[non_cached_table.op()] == 0
with pytest.warns(DeprecationWarning):
cached_table.release()

with pytest.raises(
com.IbisError,
match=r".+Did you call `\.release\(\)` twice on the same expression\?",
assert non_cached_table.op() not in con._query_cache.cache

Check warning on line 1490 in ibis/backends/tests/test_client.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/tests/test_client.py#L1490

Added line #L1490 was not covered by tests

with (
pytest.raises(
com.IbisError,
match=r".+Did you call `\.release\(\)` twice on the same expression\?",
),
pytest.warns(DeprecationWarning),
):
cached_table.release()

Expand Down
Loading
Loading