Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle pandas timestamps #958

Merged
merged 15 commits into from
Dec 1, 2022
22 changes: 21 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,28 @@ def write_deltalake(
:param overwrite_schema: If True, allows updating the schema of the table.
:param storage_options: options passed to the native delta filesystem. Unused if 'filesystem' is defined.
"""

if _has_pandas and isinstance(data, pd.DataFrame):
data = pa.Table.from_pandas(data)
if schema is not None:
data = pa.Table.from_panda(data, schema=schema)
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
else:
_data = pa.Table.from_pandas(data)
_schema = _data.schema
schema_out = []
for _field in _schema:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be willing to pull this out into a helper function (_delta_arrow_schema_from_pandas)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really like this idea. Factored this out into a helper function, but I'd prefer it to be public rather than private. Would like to leverage this helper function elsewhere. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but if you make it public, then I'd suggest putting it in schema.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

# partially handles https://github.com/delta-io/delta-rs/issues/686
if isinstance(_field.type, pa.lib.TimestampType):
f = pa.field(
name=_field.name,
type=pa.timestamp("us"),
nullable=_field.nullable,
metadata=_field.metadata,
)
schema_out.append(f)
else:
schema_out.append(_field)
schema = pa.schema(schema_out, metadata=_schema.metadata)
data = pa.Table.from_pandas(data, schema=schema)

if schema is None:
if isinstance(data, RecordBatchReader):
Expand Down
3 changes: 1 addition & 2 deletions python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,8 @@ def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Ta
def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table):
# When timestamp is converted to Pandas, it gets casted to ns resolution,
# but Delta Lake schemas only support us resolution.
sample_pandas = sample_data.to_pandas().drop(["timestamp"], axis=1)
sample_pandas = sample_data.to_pandas()
write_deltalake(str(tmp_path), sample_pandas)

delta_table = DeltaTable(str(tmp_path))
df = delta_table.to_pandas()
assert_frame_equal(df, sample_pandas)
Expand Down