Skip to content

Commit

Permalink
fix(rust): timestamp deserialization format not following protocol + …
Browse files Browse the repository at this point in the history
…missing timestampNtz deserialization (#2383)

# Description
Our timestamp deserialization format didn't include the %6f to decode
this value: 1970-01-01 00:00:00.123456. Additionally during timestampNtz
I didn't add deserialization of that primitive type :)


- fixes #2380
- fixes #2381
  • Loading branch information
ion-elgreco authored Apr 15, 2024
1 parent d49d95b commit faa743a
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 11 deletions.
11 changes: 5 additions & 6 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
};

use arrow_schema::DataType;
use chrono::{Date, NaiveDate, NaiveDateTime, TimeZone};
use chrono::{DateTime, NaiveDate};
use datafusion::execution::context::SessionState;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
Expand Down Expand Up @@ -361,7 +361,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
Some(e) => write!(
f,
"'{}'::date",
NaiveDateTime::from_timestamp_millis((*e).into())
DateTime::from_timestamp_millis((*e).into())
.ok_or(Error::default())?
.date()
.format("%Y-%m-%d")
Expand All @@ -370,18 +370,17 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
},
ScalarValue::TimestampMicrosecond(e, tz) => match e {
Some(e) => match tz {
Some(tz) => write!(
Some(_tz) => write!(
f,
"arrow_cast('{}', 'Timestamp(Microsecond, Some(\"UTC\"))')",
NaiveDateTime::from_timestamp_micros(*e)
DateTime::from_timestamp_micros(*e)
.ok_or(Error::default())?
.and_utc()
.format("%Y-%m-%dT%H:%M:%S%.6f")
)?,
None => write!(
f,
"arrow_cast('{}', 'Timestamp(Microsecond, None)')",
NaiveDateTime::from_timestamp_micros(*e)
DateTime::from_timestamp_micros(*e)
.ok_or(Error::default())?
.format("%Y-%m-%dT%H:%M:%S%.6f")
)?,
Expand Down
18 changes: 13 additions & 5 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::iter::Iterator;
use arrow_json::ReaderBuilder;
use arrow_schema::ArrowError;

use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, Utc};
use chrono::{Datelike, NaiveDate, NaiveDateTime, Utc};
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use object_store::{Error, ObjectStore};
Expand Down Expand Up @@ -440,10 +440,15 @@ fn typed_partition_value_from_string(
// day 0 is 1970-01-01 (719163 days from ce)
Ok((d.num_days_from_ce() - 719_163).into())
}
PrimitiveType::Timestamp => {
let ts = NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S").map_err(
|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()),
)?;
PrimitiveType::Timestamp | PrimitiveType::TimestampNtz => {
let ts = NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S.%6f");
let ts: NaiveDateTime = match ts {
Ok(_) => ts,
Err(_) => NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S"),
}
.map_err(|_| {
CheckpointError::PartitionValueNotParseable(string_value.to_owned())
})?;
Ok((ts.and_utc().timestamp_millis() * 1000).into())
}
s => unimplemented!(
Expand Down Expand Up @@ -744,8 +749,11 @@ mod tests {
}

for (s, v) in [
("2021-08-08 01:00:01.000000", 1628384401000000i64),
("2021-08-08 01:00:01", 1628384401000000i64),
("1970-01-02 12:59:59.000000", 133199000000i64),
("1970-01-02 12:59:59", 133199000000i64),
("1970-01-01 13:00:01.000000", 46801000000i64),
("1970-01-01 13:00:01", 46801000000i64),
("1969-12-31 00:00:00", -86400000000i64),
("1677-09-21 00:12:44", -9223372036000000i64),
Expand Down
80 changes: 80 additions & 0 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import os
import pathlib
import shutil
from datetime import date, datetime, timedelta

import pyarrow as pa
import pyarrow.parquet as pq
import pytest

from deltalake import DeltaTable, write_deltalake

Expand Down Expand Up @@ -169,6 +171,84 @@ def test_features_null_on_below_v3_v7(tmp_path: pathlib.Path):
assert checkpoint["protocol"][0]["readerFeatures"].as_py() is None


@pytest.fixture
def sample_all_types():
from datetime import timezone

nrows = 5
return pa.table(
{
"utf8": pa.array([str(x) for x in range(nrows)]),
"int64": pa.array(list(range(nrows)), pa.int64()),
"int32": pa.array(list(range(nrows)), pa.int32()),
"int16": pa.array(list(range(nrows)), pa.int16()),
"int8": pa.array(list(range(nrows)), pa.int8()),
"float32": pa.array([float(x) for x in range(nrows)], pa.float32()),
"float64": pa.array([float(x) for x in range(nrows)], pa.float64()),
"bool": pa.array([x % 2 == 0 for x in range(nrows)]),
"binary": pa.array([str(x).encode() for x in range(nrows)]),
# "decimal": pa.array([Decimal("10.000") + x for x in range(nrows)]), # Some issue with decimal and Rust engine at the moment.
"date32": pa.array(
[date(2022, 1, 1) + timedelta(days=x) for x in range(nrows)]
),
"timestampNtz": pa.array(
[datetime(2022, 1, 1) + timedelta(hours=x) for x in range(nrows)]
),
"timestamp": pa.array(
[
datetime(2022, 1, 1, tzinfo=timezone.utc) + timedelta(hours=x)
for x in range(nrows)
]
),
"struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]),
"list": pa.array([list(range(x + 1)) for x in range(nrows)]),
}
)


@pytest.mark.parametrize(
"engine,part_col",
[
("rust", "timestampNtz"),
("rust", "timestamp"),
("pyarrow", "timestampNtz"),
pytest.param(
"pyarrow",
"timestamp",
marks=pytest.mark.skip(
"Pyarrow serialization of UTC datetimes is incorrect, it appends a 'Z' at the end."
),
),
],
)
def test_checkpoint_partition_timestamp_2380(
tmp_path: pathlib.Path, sample_all_types: pa.Table, part_col: str, engine: str
):
tmp_table_path = tmp_path / "path" / "to" / "table"
checkpoint_path = tmp_table_path / "_delta_log" / "_last_checkpoint"
last_checkpoint_path = (
tmp_table_path / "_delta_log" / "00000000000000000000.checkpoint.parquet"
)

# TODO: Include binary after fixing issue "Json error: binary type is not supported"
sample_data = sample_all_types.drop(["binary"])
write_deltalake(
str(tmp_table_path),
sample_data,
partition_by=[part_col],
engine=engine, # type: ignore
)

assert not checkpoint_path.exists()

delta_table = DeltaTable(str(tmp_table_path))

delta_table.create_checkpoint()

assert last_checkpoint_path.exists()
assert checkpoint_path.exists()


def test_checkpoint_post_commit_config(tmp_path: pathlib.Path, sample_data: pa.Table):
"""Checks whether checkpoints are properly written based on commit_interval"""
tmp_table_path = tmp_path / "path" / "to" / "table"
Expand Down

0 comments on commit faa743a

Please sign in to comment.