Skip to content

Commit

Permalink
fix(druid): get basic timestamp functionality working (#9692)
Browse files Browse the repository at this point in the history
Co-authored-by: Phillip Cloud <417981+cpcloud@users.noreply.github.com>
  • Loading branch information
NickCrews and cpcloud authored Jul 25, 2024
1 parent 7254f65 commit 6cd3eee
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 335 deletions.
19 changes: 16 additions & 3 deletions ci/schema/druid.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,24 @@ PARTITIONED BY ALL TIME;

REPLACE INTO "functional_alltypes"
OVERWRITE ALL
SELECT *
SELECT
"id",
"bool_col",
"tinyint_col",
"smallint_col",
"int_col",
"bigint_col",
"float_col",
"double_col",
"date_string_col",
"string_col",
TIME_PARSE(CONCAT(REPLACE("timestamp_col", ' ', 'T'), 'Z')) AS "timestamp_col",
"year",
"month"
FROM TABLE(
EXTERN(
'{"type":"local","files":["/data/functional_alltypes.parquet"]}',
'{"type":"parquet"}',
'{"type":"local","files":["/data/functional_alltypes.csv"]}',
'{"type":"csv","skipHeaderRows":1,"columns":["id","bool_col","tinyint_col","smallint_col","int_col","bigint_col","float_col","double_col","date_string_col","string_col","timestamp_col","year","month"]}',
'[{"name":"id","type":"long"},{"name":"bool_col","type":"long"},{"name":"tinyint_col","type":"long"},{"name":"smallint_col","type":"long"},{"name":"int_col","type":"long"},{"name":"bigint_col","type":"long"},{"name":"float_col","type":"double"},{"name":"double_col","type":"double"},{"name":"date_string_col","type":"string"},{"name":"string_col","type":"string"},{"name":"timestamp_col","type":"string"},{"name":"year","type":"long"},{"name":"month","type":"long"}]'
)
)
Expand Down
23 changes: 18 additions & 5 deletions ibis/backends/druid/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def run_query(session: Session, query: str) -> None:
class TestConf(ServiceBackendTest):
# druid has the same rounding behavior as postgres
check_dtype = False
returned_timestamp_unit = "s"
returned_timestamp_unit = "ms"
supports_arrays = False
native_bool = True
supports_structs = False
Expand All @@ -106,13 +106,26 @@ class TestConf(ServiceBackendTest):
@property
def functional_alltypes(self) -> ir.Table:
t = self.connection.table("functional_alltypes")
# The parquet loading for booleans appears to be broken in Druid, so
# I'm using this as a workaround to make the data match what's on disk.
return t.mutate(bool_col=1 - t.id % 2)
return t.mutate(
# The parquet loading for booleans appears to be broken in Druid, so
# I'm using this as a workaround to make the data match what's on disk.
bool_col=1 - t.id % 2,
# timestamp_col is loaded as a long because druid's type system is
# awful: it does 99% of the work of a proper timestamp type, but
# encodes it as an integer. I've never seen or heard of any other
# tool that calls itself a time series database or "good for
# working with time series", that lacks a first-class timestamp
# type.
timestamp_col=t.timestamp_col.to_timestamp(unit="ms"),
)

@property
def test_files(self) -> Iterable[Path]:
return self.data_dir.joinpath("parquet").glob("*.parquet")
return [
path
for path in self.data_dir.joinpath("parquet").glob("*.parquet")
if path.name != "functional_alltypes.parquet"
] + [self.data_dir.joinpath("csv", "functional_alltypes.csv")]

def _load_data(self, **_: Any) -> None:
"""Load test data into a druid backend instance.
Expand Down
11 changes: 11 additions & 0 deletions ibis/backends/sql/compilers/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import sqlglot.expressions as sge
import toolz

import ibis.common.exceptions as exc
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
from ibis.backends.sql.compilers.base import NULL, AggGen, SQLGlotCompiler
from ibis.backends.sql.datatypes import DruidType
from ibis.backends.sql.dialects import Druid
from ibis.common.temporal import TimestampUnit


class DruidCompiler(SQLGlotCompiler):
Expand Down Expand Up @@ -36,7 +38,9 @@ class DruidCompiler(SQLGlotCompiler):
ops.ArrayZip,
ops.CountDistinctStar,
ops.Covariance,
ops.Date,
ops.DateDelta,
ops.DateFromYMD,
ops.DayOfWeekIndex,
ops.DayOfWeekName,
ops.First,
Expand Down Expand Up @@ -169,6 +173,13 @@ def visit_Cast(self, op, *, arg, to):
return self.f.time_parse(arg)
return super().visit_Cast(op, arg=arg, to=to)

def visit_TimestampFromUNIX(self, op, *, arg, unit):
if unit == TimestampUnit.SECOND:
return self.f.millis_to_timestamp(arg * 1_000)
elif unit == TimestampUnit.MILLISECOND:
return self.f.millis_to_timestamp(arg)
raise exc.UnsupportedArgumentError(f"Druid doesn't support {unit} units")

def visit_TimestampFromYMDHMS(
self, op, *, year, month, day, hours, minutes, seconds
):
Expand Down
10 changes: 3 additions & 7 deletions ibis/backends/tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ def mean_udf(s):
lambda t: t.timestamp_col.max(),
lambda t: t.timestamp_col.max(),
id="timestamp_max",
marks=pytest.mark.broken(
["druid"],
raises=PyDruidProgrammingError,
reason="Max aggregation is not supported for 'STRING' type SQL",
),
),
]

Expand Down Expand Up @@ -1174,12 +1169,13 @@ def test_string_quantile(alltypes, func):
assert result == "a"


@pytest.mark.notimpl(["bigquery", "sqlite"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(
["bigquery", "sqlite", "druid"], raises=com.OperationNotDefinedError
)
@pytest.mark.notyet(
["impala", "mysql", "mssql", "trino", "exasol", "flink"],
raises=com.OperationNotDefinedError,
)
@pytest.mark.broken(["druid"], raises=AttributeError)
@pytest.mark.notyet(
["snowflake"],
raises=SnowflakeProgrammingError,
Expand Down
22 changes: 7 additions & 15 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,11 @@
pa = pytest.importorskip("pyarrow")

limit = [
param(
42,
id="limit",
# limit not implemented for pandas-family backends
marks=[pytest.mark.notimpl(["dask", "pandas"])],
),
# limit not implemented for pandas-family backends
param(42, id="limit", marks=pytest.mark.notimpl(["dask", "pandas"])),
]

no_limit = [
param(
None,
id="nolimit",
)
]
no_limit = [param(None, id="nolimit")]

limit_no_limit = limit + no_limit

Expand Down Expand Up @@ -426,7 +417,9 @@ def test_roundtrip_delta(backend, con, alltypes, tmp_path):


@pytest.mark.notimpl(
["druid"], raises=AttributeError, reason="string type is used for timestamp_col"
["druid"],
raises=PyDruidProgrammingError,
reason="Invalid SQL generated; druid doesn't know about TIMESTAMPTZ",
)
def test_arrow_timestamp_with_time_zone(alltypes):
from ibis.formats.pyarrow import PyArrowType
Expand Down Expand Up @@ -512,9 +505,8 @@ def test_to_pandas_batches_column(backend, con, n):
assert sum(map(len, t.to_pandas_batches())) == n


@pytest.mark.notimpl(["druid"])
def test_to_pandas_batches_scalar(backend, con):
t = backend.functional_alltypes.timestamp_col.max()
t = backend.functional_alltypes.int_col.max()
expected = t.execute()

result1 = list(con.to_pandas_batches(t))
Expand Down
1 change: 0 additions & 1 deletion ibis/backends/tests/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,6 @@ def test_between(backend, alltypes, df):
backend.assert_series_equal(result, expected)


@pytest.mark.notimpl(["druid"])
def test_interactive(alltypes, monkeypatch):
monkeypatch.setattr(ibis.options, "interactive", True)

Expand Down
8 changes: 1 addition & 7 deletions ibis/backends/tests/test_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,19 @@ def test_scalar_param_map(con):
marks=[pytest.mark.notimpl(["druid"])],
),
param(
"2009-01-20 01:02:03",
"timestamp",
"timestamp_col",
id="string_timestamp",
marks=[pytest.mark.notimpl(["druid"])],
"2009-01-20 01:02:03", "timestamp", "timestamp_col", id="string_timestamp"
),
param(
datetime.date(2009, 1, 20),
"timestamp",
"timestamp_col",
id="date_timestamp",
marks=[pytest.mark.notimpl(["druid"])],
),
param(
datetime.datetime(2009, 1, 20, 1, 2, 3),
"timestamp",
"timestamp_col",
id="datetime_timestamp",
marks=[pytest.mark.notimpl(["druid"])],
),
],
)
Expand Down
Loading

0 comments on commit 6cd3eee

Please sign in to comment.