Skip to content

Commit

Permalink
[FEAT] Support for Timestamp datatype. (#1032)
Browse files Browse the repository at this point in the history
Adds support for an Arrow-like Timestamp datatype in Daft. Timestamp
type is parameterized by (TimeUnit, TimeZone).

**Storage format**

- TimeUnit and TimeZone are stored in the field only. They are stored in
the Arrow format (TimeUnit enum, TimeZone Option<String>).
- Timestamps are stored in the Arrow format: i64 representing number of
TimeUnits from the UNIX epoch.

**String representation**

- All timestamps are printed in ISO 8601.
- The number of subsecond digits printed reflects the resolution of the
timestamp type.
- All timezones are printed in offset format (+-XX:XX) regardless of the
timezone.

**Ingestion**

- Lossless ingestion from PyArrow tables.
- Ingestion from Python datetime.datetime to Timestamp(us, tz).

**Casting**

- Cast to and from integer types.
- Cast to and from String type (ISO 8601 only)

**Expressions**

- Blackbox

**Drivebys**

Updated arrow_cast for Daft Physical -> Daft Logical.
- Previously, casted directly to Arrow Physical.
- Updated, casts to Arrow Logical and then to Arrow Physical.

### Separate PR:

- Numeric expressions (requires Duration type)

---------

Co-authored-by: Xiayue Charles Lin <charles@eventualcomputing.com>
  • Loading branch information
xcharleslin and Xiayue Charles Lin authored Jun 14, 2023
1 parent 682dfed commit 6206c92
Show file tree
Hide file tree
Showing 23 changed files with 690 additions and 48 deletions.
164 changes: 163 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
[dependencies]
base64 = "0.21.2"
chrono = "0.4.26"
chrono-tz = "0.8.2"
dyn-clone = "1.0.11"
fnv = "1.0.7"
html-escape = "0.2.13"
Expand All @@ -12,7 +14,7 @@ serde_json = "1.0.96"

[dependencies.arrow2]
branch = "clark/expand-casting-support"
features = ["compute_take", "compute_cast", "compute_aggregate", "compute_if_then_else", "compute_sort", "compute_filter", "compute_temporal", "compute_comparison", "compute_arithmetics", "compute_concatenate", "io_ipc"]
features = ["chrono-tz", "compute_take", "compute_cast", "compute_aggregate", "compute_if_then_else", "compute_sort", "compute_filter", "compute_temporal", "compute_comparison", "compute_arithmetics", "compute_concatenate", "io_ipc"]
git = "https://github.com/Eventual-Inc/arrow2"
package = "arrow2"
version = "0.17.1"
Expand Down
3 changes: 2 additions & 1 deletion daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class daft:
)
from daft.daft import ImageFormat
from daft.dataframe import DataFrame
from daft.datatype import DataType, ImageMode
from daft.datatype import DataType, ImageMode, TimeUnit
from daft.expressions import col, lit
from daft.io import from_glob_path, read_csv, read_json, read_parquet
from daft.series import Series
Expand All @@ -103,6 +103,7 @@ class daft:
"ImageFormat",
"lit",
"Series",
"TimeUnit",
"register_viz_hook",
"udf",
]
70 changes: 69 additions & 1 deletion daft/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow as pa

from daft.context import get_context
from daft.daft import ImageMode, PyDataType
from daft.daft import ImageMode, PyDataType, PyTimeUnit

_RAY_DATA_EXTENSIONS_AVAILABLE = True
_TENSOR_EXTENSION_TYPES = []
Expand All @@ -32,6 +32,66 @@
_RAY_DATA_EXTENSIONS_AVAILABLE = False


class TimeUnit:
_timeunit: PyTimeUnit

def __init__(self) -> None:
raise NotImplementedError("Please use TimeUnit.from_str(), .s(), .ms(), .us(), or .ns() instead.")

@staticmethod
def _from_pytimeunit(o3: PyTimeUnit) -> TimeUnit:
timeunit = TimeUnit.__new__(TimeUnit)
timeunit._timeunit = o3
return timeunit

@classmethod
def s(cls) -> TimeUnit:
"""Represents seconds."""
return cls._from_pytimeunit(PyTimeUnit.seconds())

@classmethod
def ms(cls) -> TimeUnit:
"""Represents milliseconds."""
return cls._from_pytimeunit(PyTimeUnit.milliseconds())

@classmethod
def us(cls) -> TimeUnit:
"""Represents microseconds."""
return cls._from_pytimeunit(PyTimeUnit.microseconds())

@classmethod
def ns(cls) -> TimeUnit:
"""Represents nanoseconds."""
return cls._from_pytimeunit(PyTimeUnit.nanoseconds())

@classmethod
def from_str(cls, unit: str) -> TimeUnit:
unit = unit.lower()
if unit == "s":
return cls.s()
elif unit == "ms":
return cls.ms()
elif unit == "us":
return cls.us()
elif unit == "ns":
return cls.ns()
else:
raise ValueError("Unsupported unit: {unit}")

def __str__(self) -> str:
# These are the strings PyArrow uses.
if self._timeunit == PyTimeUnit.seconds():
return "s"
elif self._timeunit == PyTimeUnit.milliseconds():
return "ms"
elif self._timeunit == PyTimeUnit.microseconds():
return "us"
elif self._timeunit == PyTimeUnit.nanoseconds():
return "ns"
else:
assert False


class DataType:
"""A Daft DataType defines the type of all the values in an Expression or DataFrame column"""

Expand Down Expand Up @@ -124,6 +184,11 @@ def date(cls) -> DataType:
"""Create a Date DataType: A date with a year, month and day"""
return cls._from_pydatatype(PyDataType.date())

@classmethod
def timestamp(cls, timeunit: TimeUnit, timezone: str | None = None) -> DataType:
"""Timestamp DataType."""
return cls._from_pydatatype(PyDataType.timestamp(timeunit._timeunit, timezone))

@classmethod
def list(cls, name: str, dtype: DataType) -> DataType:
"""Create a List DataType: Variable-length list, where each element in the list has type ``dtype``
Expand Down Expand Up @@ -246,6 +311,9 @@ def from_arrow_type(cls, arrow_type: pa.lib.DataType) -> DataType:
return cls.null()
elif pa.types.is_date32(arrow_type):
return cls.date()
elif pa.types.is_timestamp(arrow_type):
timeunit = TimeUnit.from_str(arrow_type.unit)
return cls.timestamp(timeunit=timeunit, timezone=arrow_type.tz)
elif pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type):
assert isinstance(arrow_type, (pa.ListType, pa.LargeListType))
field = arrow_type.value_field
Expand Down
Loading

0 comments on commit 6206c92

Please sign in to comment.