Skip to content

Commit

Permalink
Add duration datatypes (#683)
Browse files Browse the repository at this point in the history
  • Loading branch information
billylanchantin authored Aug 24, 2023
1 parent d7b204f commit 6836f29
Show file tree
Hide file tree
Showing 16 changed files with 819 additions and 63 deletions.
67 changes: 64 additions & 3 deletions lib/explorer/backend/lazy_series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ defmodule Explorer.Backend.LazySeries do

@comparison_operations [:equal, :not_equal, :greater, :greater_equal, :less, :less_equal]

@arithmetic_operations [:add, :subtract, :multiply, :pow, :quotient, :remainder]
@arithmetic_operations [:pow, :quotient, :remainder]

@aggregation_operations [
:sum,
Expand Down Expand Up @@ -184,18 +184,44 @@ defmodule Explorer.Backend.LazySeries do
def dtype(%Series{} = s), do: s.dtype

@impl true
def cast(%Series{} = s, dtype) when is_atom(dtype) do
@valid_dtypes Explorer.Shared.dtypes()
def cast(%Series{} = s, dtype) when dtype in @valid_dtypes do
args = [lazy_series!(s), dtype]
data = new(:cast, args, aggregations?(args))

Backend.Series.new(data, dtype)
end

@impl true
def add(left, right) do
args = [data!(left), data!(right)]
data = new(:add, args, aggregations?(args))
dtype = resolve_numeric_temporal_dtype(:add, left, right)
Backend.Series.new(data, dtype)
end

@impl true
def subtract(left, right) do
args = [data!(left), data!(right)]
data = new(:subtract, args, aggregations?(args))
dtype = resolve_numeric_temporal_dtype(:subtract, left, right)
Backend.Series.new(data, dtype)
end

@impl true
def multiply(left, right) do
args = [data!(left), data!(right)]
data = new(:multiply, args, aggregations?(args))
dtype = resolve_numeric_temporal_dtype(:multiply, left, right)
Backend.Series.new(data, dtype)
end

@impl true
def divide(left, right) do
args = [data!(left), data!(right)]
data = new(:divide, args, aggregations?(args))
Backend.Series.new(data, :float)
dtype = resolve_numeric_temporal_dtype(:divide, left, right)
Backend.Series.new(data, dtype)
end

@impl true
Expand Down Expand Up @@ -635,6 +661,41 @@ defmodule Explorer.Backend.LazySeries do
defp resolve_numeric_dtype(:window_mean, _items), do: :float
defp resolve_numeric_dtype(_op, items), do: resolve_numeric_dtype(items)

defp resolve_numeric_temporal_dtype(op, %Series{dtype: ldt} = left, %Series{dtype: rdt} = right) do
case {op, ldt, rdt} do
{:add, {:datetime, ltu}, {:duration, rtu}} -> {:datetime, highest_precision(ltu, rtu)}
{:add, {:duration, ltu}, {:datetime, rtu}} -> {:datetime, highest_precision(ltu, rtu)}
{:add, {:duration, ltu}, {:duration, rtu}} -> {:duration, highest_precision(ltu, rtu)}
{:subtract, {:datetime, ltu}, {:datetime, rtu}} -> {:duration, highest_precision(ltu, rtu)}
{:subtract, {:datetime, ltu}, {:duration, rtu}} -> {:datetime, highest_precision(ltu, rtu)}
{:subtract, {:duration, ltu}, {:duration, rtu}} -> {:duration, highest_precision(ltu, rtu)}
{:multiply, :integer, {:duration, tu}} -> {:duration, tu}
{:multiply, {:duration, tu}, :integer} -> {:duration, tu}
{:divide, {:duration, tu}, :integer} -> {:duration, tu}
{:divide, _, {:duration, _}} -> raise("cannot divide by duration")
{:divide, _, _} -> :float
_ -> resolve_numeric_dtype([left, right])
end
end

defp resolve_numeric_temporal_dtype(op, left, right) do
case op do
:divide -> :float
_ -> resolve_numeric_dtype([left, right])
end
end

defp highest_precision(left_timeunit, right_timeunit) do
# Higher precision wins, otherwise information is lost.
case {left_timeunit, right_timeunit} do
{equal, equal} -> equal
{:nanosecond, _} -> :nanosecond
{_, :nanosecond} -> :nanosecond
{:microsecond, _} -> :microsecond
{_, :microsecond} -> :microsecond
end
end

# Returns the inner `data` if it's a lazy series. Otherwise raises an error.
defp lazy_series!(series) do
case series do
Expand Down
64 changes: 64 additions & 0 deletions lib/explorer/duration.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Explorer.Duration do
# Internal representation of a duration.
@moduledoc false
alias Explorer.Duration

@enforce_keys [:value, :precision]
defstruct [:value, :precision]

# Nanosecond constants
@us_ns 1_000
@ms_ns 1_000 * @us_ns
@sec_ns 1_000 * @ms_ns
@min_ns 60 * @sec_ns
@hour_ns 60 * @min_ns
@day_ns 24 * @hour_ns

def to_string(%Explorer.Duration{value: value, precision: precision}) do
case precision do
:millisecond -> format_nanoseconds(value * @ms_ns)
:microsecond -> format_nanoseconds(value * @us_ns)
:nanosecond -> format_nanoseconds(value)
end
end

defp format_nanoseconds(nanoseconds) when is_integer(nanoseconds) do
result = nanoseconds |> abs |> format_pos_nanoseconds()

if nanoseconds < 0 do
"-" <> result
else
result
end
end

defp format_pos_nanoseconds(nanoseconds) when is_integer(nanoseconds) and nanoseconds >= 0 do
[d: @day_ns, h: @hour_ns, m: @min_ns, s: @sec_ns, ms: @ms_ns, us: @us_ns, ns: 1]
|> Enum.reduce({[], nanoseconds}, fn {unit, ns_per_unit}, {parts, ns} ->
{num_units, remaining_ns} =
if ns >= ns_per_unit do
{div(ns, ns_per_unit), rem(ns, ns_per_unit)}
else
{0, ns}
end

{[{unit, num_units} | parts], remaining_ns}
end)
|> then(fn {parts_reversed, _} -> parts_reversed end)
|> Enum.reverse()
|> Enum.reject(fn {_unit, value} -> value == 0 end)
|> Enum.map_join(" ", fn {unit, value} -> "#{value}#{unit}" end)
|> case do
"" -> "0"
result -> result
end
end

defimpl String.Chars do
def to_string(%Duration{} = duration), do: Duration.to_string(duration)
end

defimpl Inspect do
def inspect(%Duration{} = duration, _), do: "Duration[" <> Duration.to_string(duration) <> "]"
end
end
5 changes: 3 additions & 2 deletions lib/explorer/polars_backend/expression.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ defmodule Explorer.PolarsBackend.Expression do
end

def to_expr(%LazySeries{op: :cast, args: [lazy_series, dtype]}) do
expr = to_expr(lazy_series)
Native.expr_cast(expr, Atom.to_string(dtype))
lazy_series_expr = to_expr(lazy_series)
dtype_expr = Explorer.Shared.dtype_to_string(dtype)
Native.expr_cast(lazy_series_expr, dtype_expr)
end

def to_expr(%LazySeries{op: :fill_missing_with_strategy, args: [lazy_series, strategy]}) do
Expand Down
1 change: 1 addition & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ defmodule Explorer.PolarsBackend.Native do
def s_from_list_date(_name, _val), do: err()
def s_from_list_time(_name, _val), do: err()
def s_from_list_datetime(_name, _val, _precision), do: err()
def s_from_list_duration(_name, _val, _precision), do: err()
def s_from_list_f64(_name, _val), do: err()
def s_from_list_i64(_name, _val), do: err()
def s_from_list_u32(_name, _val), do: err()
Expand Down
16 changes: 16 additions & 0 deletions lib/explorer/polars_backend/series.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ defmodule Explorer.PolarsBackend.Series do
def cast(series, {:datetime, :nanosecond}),
do: Shared.apply_series(series, :s_cast, ["datetime[ns]"])

def cast(series, {:duration, :millisecond}),
do: Shared.apply_series(series, :s_cast, ["duration[ms]"])

def cast(series, {:duration, :microsecond}),
do: Shared.apply_series(series, :s_cast, ["duration[μs]"])

def cast(series, {:duration, :nanosecond}),
do: Shared.apply_series(series, :s_cast, ["duration[ns]"])

def cast(series, dtype), do: Shared.apply_series(series, :s_cast, [Atom.to_string(dtype)])

@impl true
Expand Down Expand Up @@ -78,6 +87,9 @@ defmodule Explorer.PolarsBackend.Series do
"datetime[ms]" -> {:s, 64}
"datetime[μs]" -> {:s, 64}
"datetime[ns]" -> {:s, 64}
"duration[ms]" -> {:s, 64}
"duration[μs]" -> {:s, 64}
"duration[ns]" -> {:s, 64}
"cat" -> {:u, 32}
dtype -> raise "cannot convert dtype #{inspect(dtype)} to iotype"
end
Expand Down Expand Up @@ -681,6 +693,10 @@ defmodule Explorer.PolarsBackend.Series do
defp to_mod_series(value, %{dtype: :integer}, mod) when is_float(value) or is_non_finite(value),
do: mod.from_list([value], :float)

defp to_mod_series(%NaiveDateTime{} = value, %{dtype: {dtype_base, _}}, mod)
when dtype_base in [:datetime, :duration],
do: mod.from_list([value], {:datetime, :microsecond})

defp to_mod_series(value, %{dtype: :category}, mod),
do: mod.from_list([value], :string)

Expand Down
16 changes: 16 additions & 0 deletions lib/explorer/polars_backend/shared.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ defmodule Explorer.PolarsBackend.Shared do
:date -> Native.s_from_list_date(name, list)
:time -> Native.s_from_list_time(name, list)
{:datetime, precision} -> Native.s_from_list_datetime(name, list, Atom.to_string(precision))
{:duration, precision} -> Native.s_from_list_duration(name, list, Atom.to_string(precision))
:binary -> Native.s_from_list_binary(name, list)
end
end
Expand All @@ -134,6 +135,15 @@ defmodule Explorer.PolarsBackend.Shared do
{:datetime, :nanosecond} ->
Native.s_from_binary_i64(name, binary) |> Native.s_cast("datetime[ns]") |> ok()

{:duration, :millisecond} ->
Native.s_from_binary_i64(name, binary) |> Native.s_cast("duration[ms]") |> ok()

{:duration, :microsecond} ->
Native.s_from_binary_i64(name, binary) |> Native.s_cast("duration[μs]") |> ok()

{:duration, :nanosecond} ->
Native.s_from_binary_i64(name, binary) |> Native.s_cast("duration[ns]") |> ok()

:integer ->
Native.s_from_binary_i64(name, binary)

Expand All @@ -152,6 +162,9 @@ defmodule Explorer.PolarsBackend.Shared do
def normalise_dtype("datetime[ms]"), do: {:datetime, :millisecond}
def normalise_dtype("datetime[ns]"), do: {:datetime, :nanosecond}
def normalise_dtype("datetime[μs]"), do: {:datetime, :microsecond}
def normalise_dtype("duration[ms]"), do: {:duration, :millisecond}
def normalise_dtype("duration[ns]"), do: {:duration, :nanosecond}
def normalise_dtype("duration[μs]"), do: {:duration, :microsecond}
def normalise_dtype("f64"), do: :float
def normalise_dtype("i64"), do: :integer
def normalise_dtype("list[u32]"), do: :integer
Expand All @@ -165,6 +178,9 @@ defmodule Explorer.PolarsBackend.Shared do
def internal_from_dtype({:datetime, :millisecond}), do: "datetime[ms]"
def internal_from_dtype({:datetime, :nanosecond}), do: "datetime[ns]"
def internal_from_dtype({:datetime, :microsecond}), do: "datetime[μs]"
def internal_from_dtype({:duration, :millisecond}), do: "duration[ms]"
def internal_from_dtype({:duration, :nanosecond}), do: "duration[ns]"
def internal_from_dtype({:duration, :microsecond}), do: "duration[μs]"
def internal_from_dtype(:float), do: "f64"
def internal_from_dtype(:integer), do: "i64"
def internal_from_dtype(:string), do: "str"
Expand Down
Loading

0 comments on commit 6836f29

Please sign in to comment.