Skip to content

Commit

Permalink
feat(api): count nulls with topk (#8531)
Browse files Browse the repository at this point in the history
Default to count star instead of count(expr) in topk API.

BREAKING CHANGE: expr.topk(...) now includes null counts. The row count of the topk call will not differ, but the number of nulls counted will no longer be zero. To drop the null row use the dropna method.
  • Loading branch information
cpcloud authored Mar 14, 2024
1 parent 14358fe commit 54c2c70
Show file tree
Hide file tree
Showing 24 changed files with 127 additions and 20 deletions.
3 changes: 3 additions & 0 deletions ci/schema/clickhouse.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,6 @@ INSERT INTO ibis_testing.win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

CREATE OR REPLACE TABLE ibis_testing.topk (x Nullable(Int64)) ENGINE = Memory;
INSERT INTO ibis_testing.topk VALUES (1), (1), (NULL);
4 changes: 4 additions & 0 deletions ci/schema/duckdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ CREATE OR REPLACE TABLE map (idx BIGINT, kv MAP(STRING, BIGINT));
INSERT INTO map VALUES
(1, MAP(['a', 'b', 'c'], [1, 2, 3])),
(2, MAP(['d', 'e', 'f'], [4, 5, 6]));


CREATE OR REPLACE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
3 changes: 3 additions & 0 deletions ci/schema/exasol.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,6 @@ INSERT INTO "win" VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

CREATE OR REPLACE TABLE EXASOL."topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/mssql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,8 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk;

CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,8 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk CASCADE;

CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/oracle.sql
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ INSERT INTO "win" VALUES
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS "topk";

CREATE TABLE "topk" ("x" NUMBER(18));
INSERT INTO "topk" VALUES (1), (1), (NULL);

COMMIT;
5 changes: 5 additions & 0 deletions ci/schema/postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,8 @@ CREATE TABLE map (idx BIGINT, kv HSTORE);
INSERT INTO map VALUES
(1, 'a=>1,b=>2,c=>3'),
(2, 'd=>4,e=>5,c=>6');

DROP TABLE IF EXISTS topk;

CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/risingwave.sql
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,8 @@ INSERT INTO "win" VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS "topk";

CREATE TABLE "topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
3 changes: 3 additions & 0 deletions ci/schema/snowflake.sql
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@ INSERT INTO "win" VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

CREATE OR REPLACE TABLE "topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
4 changes: 4 additions & 0 deletions ci/schema/sqlite.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk;
CREATE TABLE "topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
4 changes: 4 additions & 0 deletions ci/schema/trino.sql
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,7 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk;
CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
21 changes: 20 additions & 1 deletion ibis/backends/bigquery/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from ibis.backends.bigquery.datatypes import BigQuerySchema
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import json_types, non_null_array_types, struct_types, win
from ibis.backends.tests.data import (
json_types,
non_null_array_types,
struct_types,
topk,
win,
)

DATASET_ID = "ibis_gbq_testing"
DATASET_ID_TOKYO = "ibis_gbq_testing_tokyo"
Expand Down Expand Up @@ -215,6 +221,19 @@ def _load_data(self, **_: Any) -> None:
)
)

futures.append(
e.submit(
make_job,
client.load_table_from_dataframe,
topk.to_pandas(),
bq.TableReference(testing_dataset, "topk"),
job_config=bq.LoadJobConfig(
write_disposition=write_disposition,
schema=BigQuerySchema.from_ibis(ibis.schema(dict(x="int64"))),
),
)
)

futures.append(
e.submit(
make_job,
Expand Down
7 changes: 6 additions & 1 deletion ibis/backends/dask/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import ibis.expr.datatypes as dt
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.pandas.tests.conftest import TestConf as PandasTest
from ibis.backends.tests.data import array_types, json_types, win
from ibis.backends.tests.data import array_types, json_types, topk, win

dd = pytest.importorskip("dask.dataframe")

Expand Down Expand Up @@ -66,6 +66,11 @@ def _load_data(self, **_: Any) -> None:
dd.from_pandas(json_types, npartitions=NPARTITIONS),
overwrite=True,
)
con.create_table(
"topk",
dd.from_pandas(topk.to_pandas(), npartitions=NPARTITIONS),
overwrite=True,
)

@classmethod
def assert_series_equal(
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/datafusion/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import ibis
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, win
from ibis.backends.tests.data import array_types, topk, win


class TestConf(BackendTest):
Expand All @@ -27,6 +27,7 @@ def _load_data(self, **_: Any) -> None:
con.register(path, table_name=table_name)
con.register(array_types, table_name="array_types")
con.register(win, table_name="win")
con.register(topk, table_name="topk")

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import ibis
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, json_types, struct_types, win
from ibis.backends.tests.data import array_types, json_types, struct_types, topk, win


class TestConf(BackendTest):
Expand Down Expand Up @@ -75,6 +75,7 @@ def _load_data(self, **_: Any) -> None:
schema=ibis.schema({"idx": "int64", "kv": "map<string, int64>"}),
temp=True,
)
con.create_table("topk", topk, temp=True)


class TestConfForStreaming(TestConf):
Expand Down
4 changes: 4 additions & 0 deletions ibis/backends/impala/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def _load_data(self, **_: Any) -> None:
('a', 4, 1)
"""
)
con.create_table(
"topk", schema=ibis.schema({"x": "int64"}), database="ibis_testing"
)
con.raw_sql("INSERT INTO ibis_testing.topk VALUES (1), (1), (NULL)")
assert con.list_tables(database="ibis_testing")

def postload(self, **kw):
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/pandas/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.pandas import Backend
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, json_types, struct_types, win
from ibis.backends.tests.data import array_types, json_types, struct_types, topk, win


class TestConf(BackendTest):
Expand All @@ -35,6 +35,7 @@ def _load_data(self, **_: Any) -> None:
con.create_table("struct", struct_types, overwrite=True)
con.create_table("win", win, overwrite=True)
con.create_table("json_t", json_types, overwrite=True)
con.create_table("topk", topk.to_pandas(), overwrite=True)

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
Expand Down
6 changes: 5 additions & 1 deletion ibis/backends/polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from typing import Any

import numpy as np
import polars as pl
import pytest

import ibis
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, struct_types, win
from ibis.backends.tests.data import array_types, struct_types, topk, win


class TestConf(BackendTest):
Expand All @@ -27,6 +28,9 @@ def _load_data(self, **_: Any) -> None:
con.register(struct_types, table_name="struct")
con.register(win, table_name="win")

# TODO: remove when pyarrow inputs are supported
con._add_table("topk", pl.from_arrow(topk).lazy())

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
return ibis.polars.connect(**kw)
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/pyspark/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ibis import util
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import json_types, win
from ibis.backends.tests.data import json_types, topk, win


def set_pyspark_database(con, database):
Expand Down Expand Up @@ -129,6 +129,7 @@ def _load_data(self, **_: Any) -> None:

s.createDataFrame(json_types).createOrReplaceTempView("json_t")
s.createDataFrame(win).createOrReplaceTempView("win")
s.createDataFrame(topk.to_pandas()).createOrReplaceTempView("topk")

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
Expand Down
3 changes: 3 additions & 0 deletions ibis/backends/tests/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa

array_types = pd.DataFrame(
[
Expand Down Expand Up @@ -124,3 +125,5 @@
"y": [3, 2, 0, 1, 1],
}
)

topk = pa.Table.from_pydict({"x": [1, 1, None]})
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ SELECT
FROM (
SELECT
"t1"."city",
"t1"."Count(city)"
"t1"."CountStar(tbl)"
FROM (
SELECT
"t0"."city",
COUNT("t0"."city") AS "Count(city)"
COUNT(*) AS "CountStar(tbl)"
FROM "tbl" AS "t0"
GROUP BY
1
) AS "t1"
ORDER BY
"t1"."Count(city)" DESC
"t1"."CountStar(tbl)" DESC
LIMIT 10
) AS "t3"
LIMIT 5
Expand All @@ -23,17 +23,17 @@ OFFSET (
FROM (
SELECT
"t1"."city",
"t1"."Count(city)"
"t1"."CountStar(tbl)"
FROM (
SELECT
"t0"."city",
COUNT("t0"."city") AS "Count(city)"
COUNT(*) AS "CountStar(tbl)"
FROM "tbl" AS "t0"
GROUP BY
1
) AS "t1"
ORDER BY
"t1"."Count(city)" DESC
"t1"."CountStar(tbl)" DESC
LIMIT 10
) AS "t3"
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ FROM "tbl" AS "t1"
SEMI JOIN (
SELECT
"t2"."city",
"t2"."Count(city)"
"t2"."CountStar(tbl)"
FROM (
SELECT
"t0"."city",
COUNT("t0"."city") AS "Count(city)"
COUNT(*) AS "CountStar(tbl)"
FROM "tbl" AS "t0"
GROUP BY
1
) AS "t2"
ORDER BY
"t2"."Count(city)" DESC
"t2"."CountStar(tbl)" DESC
LIMIT 10
) AS "t5"
ON "t1"."city" = "t5"."city"
23 changes: 22 additions & 1 deletion ibis/backends/tests/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1884,7 +1884,7 @@ def test_isnull_equality(con, backend, monkeypatch):
["druid"],
raises=PyDruidProgrammingError,
reason=(
"Query could not be planned. A possible reason is [SQL query requires ordering a "
"Query could not be planned. A possible reason is SQL query requires ordering a "
"table by non-time column [[id]], which is not supported."
),
)
Expand All @@ -1898,3 +1898,24 @@ def test_subsequent_overlapping_order_by(con, backend, alltypes, df):
result = con.execute(ts2)
expected = df.sort_values("id", ascending=False).reset_index(drop=True)
backend.assert_frame_equal(result, expected)


@pytest.mark.broken(
["clickhouse"],
raises=AssertionError,
reason="https://github.com/ClickHouse/ClickHouse/issues/61313",
)
@pytest.mark.notimpl(
["pandas", "dask"], raises=IndexError, reason="NaN isn't treated as NULL"
)
@pytest.mark.notimpl(
["druid"],
raises=AttributeError,
reason="inserting three rows into druid is difficult",
)
def test_topk_counts_null(con):
t = con.tables.topk
tk = t.x.topk(10)
tkf = tk.filter(_.x.isnull())[1]
result = con.to_pyarrow(tkf)
assert result[0].as_py() == 1
7 changes: 4 additions & 3 deletions ibis/expr/types/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1823,10 +1823,11 @@ def topk(
raise com.IbisTypeError("TopK must depend on exactly one table.")

table = table.to_expr()

if by is None:
metric = self.count()
else:
(metric,) = bind(table, by)
by = lambda t: t.count()

(metric,) = bind(table, by)

return table.aggregate(metric, by=[self]).order_by(metric.desc()).limit(k)

Expand Down

0 comments on commit 54c2c70

Please sign in to comment.