Skip to content

Commit

Permalink
feat(api): count nulls with topk
Browse files Browse the repository at this point in the history
BREAKING CHANGE: `expr.topk(...)` now includes null counts. The row count of the `topk` call will not differ, but the number of nulls counted will no longer be zero. To drop the `null` row use the `dropna` method.
  • Loading branch information
cpcloud committed Mar 14, 2024
1 parent 220085e commit 680d97d
Show file tree
Hide file tree
Showing 24 changed files with 127 additions and 20 deletions.
3 changes: 3 additions & 0 deletions ci/schema/clickhouse.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,6 @@ INSERT INTO ibis_testing.win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

CREATE OR REPLACE TABLE ibis_testing.topk (x Nullable(Int64)) ENGINE = Memory;
INSERT INTO ibis_testing.topk VALUES (1), (1), (NULL);
4 changes: 4 additions & 0 deletions ci/schema/duckdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ CREATE OR REPLACE TABLE map (idx BIGINT, kv MAP(STRING, BIGINT));
INSERT INTO map VALUES
(1, MAP(['a', 'b', 'c'], [1, 2, 3])),
(2, MAP(['d', 'e', 'f'], [4, 5, 6]));


CREATE OR REPLACE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
3 changes: 3 additions & 0 deletions ci/schema/exasol.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,6 @@ INSERT INTO "win" VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

CREATE OR REPLACE TABLE EXASOL."topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/mssql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,8 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk;

CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,8 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk CASCADE;

CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/oracle.sql
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ INSERT INTO "win" VALUES
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS "topk";

CREATE TABLE "topk" ("x" NUMBER(18));
INSERT INTO "topk" VALUES (1), (1), (NULL);

COMMIT;
5 changes: 5 additions & 0 deletions ci/schema/postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,8 @@ CREATE TABLE map (idx BIGINT, kv HSTORE);
INSERT INTO map VALUES
(1, 'a=>1,b=>2,c=>3'),
(2, 'd=>4,e=>5,c=>6');

DROP TABLE IF EXISTS topk;

CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
5 changes: 5 additions & 0 deletions ci/schema/risingwave.sql
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,8 @@ INSERT INTO "win" VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS "topk";

CREATE TABLE "topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
3 changes: 3 additions & 0 deletions ci/schema/snowflake.sql
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,6 @@ INSERT INTO "win" VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

CREATE OR REPLACE TABLE "topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
4 changes: 4 additions & 0 deletions ci/schema/sqlite.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk;
CREATE TABLE "topk" ("x" BIGINT);
INSERT INTO "topk" VALUES (1), (1), (NULL);
4 changes: 4 additions & 0 deletions ci/schema/trino.sql
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,7 @@ INSERT INTO win VALUES
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);

DROP TABLE IF EXISTS topk;
CREATE TABLE topk (x BIGINT);
INSERT INTO topk VALUES (1), (1), (NULL);
21 changes: 20 additions & 1 deletion ibis/backends/bigquery/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from ibis.backends.bigquery.datatypes import BigQuerySchema
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import json_types, non_null_array_types, struct_types, win
from ibis.backends.tests.data import (
json_types,
non_null_array_types,
struct_types,
topk,
win,
)

DATASET_ID = "ibis_gbq_testing"
DATASET_ID_TOKYO = "ibis_gbq_testing_tokyo"
Expand Down Expand Up @@ -215,6 +221,19 @@ def _load_data(self, **_: Any) -> None:
)
)

futures.append(
e.submit(
make_job,
client.load_table_from_dataframe,
topk.to_pandas(),
bq.TableReference(testing_dataset, "topk"),
job_config=bq.LoadJobConfig(
write_disposition=write_disposition,
schema=BigQuerySchema.from_ibis(ibis.schema(dict(x="int64"))),
),
)
)

futures.append(
e.submit(
make_job,
Expand Down
7 changes: 6 additions & 1 deletion ibis/backends/dask/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import ibis.expr.datatypes as dt
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.pandas.tests.conftest import TestConf as PandasTest
from ibis.backends.tests.data import array_types, json_types, win
from ibis.backends.tests.data import array_types, json_types, topk, win

dd = pytest.importorskip("dask.dataframe")

Expand Down Expand Up @@ -66,6 +66,11 @@ def _load_data(self, **_: Any) -> None:
dd.from_pandas(json_types, npartitions=NPARTITIONS),
overwrite=True,
)
con.create_table(
"topk",
dd.from_pandas(topk.to_pandas(), npartitions=NPARTITIONS),
overwrite=True,
)

@classmethod
def assert_series_equal(
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/datafusion/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import ibis
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, win
from ibis.backends.tests.data import array_types, topk, win


class TestConf(BackendTest):
Expand All @@ -27,6 +27,7 @@ def _load_data(self, **_: Any) -> None:
con.register(path, table_name=table_name)
con.register(array_types, table_name="array_types")
con.register(win, table_name="win")
con.register(topk, table_name="topk")

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import ibis
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, json_types, struct_types, win
from ibis.backends.tests.data import array_types, json_types, struct_types, topk, win

Check warning on line 11 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L11

Added line #L11 was not covered by tests


class TestConf(BackendTest):
Expand Down Expand Up @@ -75,6 +75,7 @@ def _load_data(self, **_: Any) -> None:
schema=ibis.schema({"idx": "int64", "kv": "map<string, int64>"}),
temp=True,
)
con.create_table("topk", topk, temp=True)

Check warning on line 78 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L78

Added line #L78 was not covered by tests


class TestConfForStreaming(TestConf):
Expand Down
4 changes: 4 additions & 0 deletions ibis/backends/impala/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def _load_data(self, **_: Any) -> None:
('a', 4, 1)
"""
)
con.create_table(
"topk", schema=ibis.schema({"x": "int64"}), database="ibis_testing"
)
con.raw_sql("INSERT INTO ibis_testing.topk VALUES (1), (1), (NULL)")
assert con.list_tables(database="ibis_testing")

def postload(self, **kw):
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/pandas/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.pandas import Backend
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, json_types, struct_types, win
from ibis.backends.tests.data import array_types, json_types, struct_types, topk, win


class TestConf(BackendTest):
Expand All @@ -35,6 +35,7 @@ def _load_data(self, **_: Any) -> None:
con.create_table("struct", struct_types, overwrite=True)
con.create_table("win", win, overwrite=True)
con.create_table("json_t", json_types, overwrite=True)
con.create_table("topk", topk.to_pandas(), overwrite=True)

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
Expand Down
6 changes: 5 additions & 1 deletion ibis/backends/polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from typing import Any

import numpy as np
import polars as pl
import pytest

import ibis
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import array_types, struct_types, win
from ibis.backends.tests.data import array_types, struct_types, topk, win


class TestConf(BackendTest):
Expand All @@ -27,6 +28,9 @@ def _load_data(self, **_: Any) -> None:
con.register(struct_types, table_name="struct")
con.register(win, table_name="win")

# TODO: remove when pyarrow inputs are supported
con._add_table("topk", pl.from_arrow(topk).lazy())

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
return ibis.polars.connect(**kw)
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/pyspark/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ibis import util
from ibis.backends.conftest import TEST_TABLES
from ibis.backends.tests.base import BackendTest
from ibis.backends.tests.data import json_types, win
from ibis.backends.tests.data import json_types, topk, win


def set_pyspark_database(con, database):
Expand Down Expand Up @@ -129,6 +129,7 @@ def _load_data(self, **_: Any) -> None:

s.createDataFrame(json_types).createOrReplaceTempView("json_t")
s.createDataFrame(win).createOrReplaceTempView("win")
s.createDataFrame(topk.to_pandas()).createOrReplaceTempView("topk")

@staticmethod
def connect(*, tmpdir, worker_id, **kw):
Expand Down
3 changes: 3 additions & 0 deletions ibis/backends/tests/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa

array_types = pd.DataFrame(
[
Expand Down Expand Up @@ -124,3 +125,5 @@
"y": [3, 2, 0, 1, 1],
}
)

topk = pa.Table.from_pydict({"x": [1, 1, None]})
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ SELECT
FROM (
SELECT
"t1"."city",
"t1"."Count(city)"
"t1"."CountStar(tbl)"
FROM (
SELECT
"t0"."city",
COUNT("t0"."city") AS "Count(city)"
COUNT(*) AS "CountStar(tbl)"
FROM "tbl" AS "t0"
GROUP BY
1
) AS "t1"
ORDER BY
"t1"."Count(city)" DESC
"t1"."CountStar(tbl)" DESC
LIMIT 10
) AS "t3"
LIMIT 5
Expand All @@ -23,17 +23,17 @@ OFFSET (
FROM (
SELECT
"t1"."city",
"t1"."Count(city)"
"t1"."CountStar(tbl)"
FROM (
SELECT
"t0"."city",
COUNT("t0"."city") AS "Count(city)"
COUNT(*) AS "CountStar(tbl)"
FROM "tbl" AS "t0"
GROUP BY
1
) AS "t1"
ORDER BY
"t1"."Count(city)" DESC
"t1"."CountStar(tbl)" DESC
LIMIT 10
) AS "t3"
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ FROM "tbl" AS "t1"
SEMI JOIN (
SELECT
"t2"."city",
"t2"."Count(city)"
"t2"."CountStar(tbl)"
FROM (
SELECT
"t0"."city",
COUNT("t0"."city") AS "Count(city)"
COUNT(*) AS "CountStar(tbl)"
FROM "tbl" AS "t0"
GROUP BY
1
) AS "t2"
ORDER BY
"t2"."Count(city)" DESC
"t2"."CountStar(tbl)" DESC
LIMIT 10
) AS "t5"
ON "t1"."city" = "t5"."city"
23 changes: 22 additions & 1 deletion ibis/backends/tests/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1884,7 +1884,7 @@ def test_isnull_equality(con, backend, monkeypatch):
["druid"],
raises=PyDruidProgrammingError,
reason=(
"Query could not be planned. A possible reason is [SQL query requires ordering a "
"Query could not be planned. A possible reason is SQL query requires ordering a "
"table by non-time column [[id]], which is not supported."
),
)
Expand All @@ -1898,3 +1898,24 @@ def test_subsequent_overlapping_order_by(con, backend, alltypes, df):
result = con.execute(ts2)
expected = df.sort_values("id", ascending=False).reset_index(drop=True)
backend.assert_frame_equal(result, expected)


@pytest.mark.broken(
["clickhouse"],
raises=AssertionError,
reason="https://github.com/ClickHouse/ClickHouse/issues/61313",
)
@pytest.mark.notimpl(
["pandas", "dask"], raises=IndexError, reason="NaN isn't treated as NULL"
)
@pytest.mark.notimpl(
["druid"],
raises=AttributeError,
reason="inserting three rows into druid is difficult",
)
def test_topk_counts_null(con):
t = con.tables.topk
tk = t.x.topk(10)
tkf = tk.filter(_.x.isnull())[1]
result = con.to_pyarrow(tkf)
assert result[0].as_py() == 1
7 changes: 4 additions & 3 deletions ibis/expr/types/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1823,10 +1823,11 @@ def topk(
raise com.IbisTypeError("TopK must depend on exactly one table.")

table = table.to_expr()

if by is None:
metric = self.count()
else:
(metric,) = bind(table, by)
by = lambda t: t.count()

(metric,) = bind(table, by)

return table.aggregate(metric, by=[self]).order_by(metric.desc()).limit(k)

Expand Down

0 comments on commit 680d97d

Please sign in to comment.