Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose function to get table of add actions #1033

Merged
merged 13 commits into from
Jan 11, 2023
34 changes: 34 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,37 @@ def __stringify_partition_values(
str_value = str(value)
out.append((field, op, str_value))
return out

def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch:
"""
Return a dataframe with all current add actions.

Add actions represent the files that currently make up the table. This
data is a low-level representation parsed from the transaction log.

:param flatten: whether to flatten the schema. Partition values columns are
given the prefix `partition.`, statistics (null_count, min, and max) are
given the prefix `null_count.`, `min.`, and `max.`, and tags the
prefix `tags.`. Nested field names are concatenated with `.`.

:returns: a PyArrow RecordBatch containing the add action data.

Examples:

>>> from deltalake import DeltaTable, write_deltalake
>>> import pyarrow as pa
>>> data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> write_deltalake("tmp", data, partition_by=["x"])
>>> dt = DeltaTable("tmp")
>>> dt.get_add_actions_df().to_pandas()
path size_bytes modification_time data_change partition_values num_records null_count min max
0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 2} 1 {'y': 0} {'y': 5} {'y': 5}
1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 3} 1 {'y': 0} {'y': 6} {'y': 6}
2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True {'x': 1} 1 {'y': 0} {'y': 4} {'y': 4}
>>> dt.get_add_actions_df(flatten=True).to_pandas()
path size_bytes modification_time data_change partition.x num_records null_count.y min.y max.y
0 x=2/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 2 1 0 5 5
1 x=3/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 3 1 0 6 6
2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4
"""
return self._table.get_add_actions(flatten)
9 changes: 9 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@ impl RawDeltaTable {

Ok(())
}

pub fn get_add_actions(&self, flatten: bool) -> PyResult<PyArrowType<RecordBatch>> {
Ok(PyArrowType(
self._table
.get_state()
.add_actions_table(flatten)
.map_err(PyDeltaTableError::from_raw)?,
))
}
}

fn convert_partition_filters<'a>(
Expand Down
39 changes: 39 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,45 @@ def test_history_partitioned_table_metadata():
}


@pytest.mark.parametrize("flatten", [True, False])
def test_add_actions_table(flatten: bool):
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
actions_df = dt.get_add_actions(flatten)
# RecordBatch doesn't have a sort_by method yet
actions_df = pa.Table.from_batches([actions_df]).sort_by("path").to_batches()[0]

assert actions_df.num_rows == 6
assert actions_df["path"] == pa.array(
[
"year=2020/month=1/day=1/part-00000-8eafa330-3be9-4a39-ad78-fd13c2027c7e.c000.snappy.parquet",
"year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet",
"year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet",
"year=2021/month=12/day=20/part-00000-9275fdf4-3961-4184-baa0-1c8a2bb98104.c000.snappy.parquet",
"year=2021/month=12/day=4/part-00000-6dc763c0-3e8b-4d52-b19e-1f92af3fbb25.c000.snappy.parquet",
"year=2021/month=4/day=5/part-00000-c5856301-3439-4032-a6fc-22b7bc92bebb.c000.snappy.parquet",
]
)
assert actions_df["size_bytes"] == pa.array([414, 414, 414, 407, 414, 414])
assert actions_df["data_change"] == pa.array([True] * 6)
assert actions_df["modification_time"] == pa.array(
[1615555646000] * 6, type=pa.timestamp("ms")
)

if flatten:
partition_year = actions_df["partition.year"]
partition_month = actions_df["partition.month"]
partition_day = actions_df["partition.day"]
else:
partition_year = actions_df["partition_values"].field("year")
partition_month = actions_df["partition_values"].field("month")
partition_day = actions_df["partition_values"].field("day")

assert partition_year == pa.array(["2020"] * 3 + ["2021"] * 3)
assert partition_month == pa.array(["1", "2", "2", "12", "12", "4"])
assert partition_day == pa.array(["1", "3", "5", "20", "4", "5"])


def assert_correct_files(dt: DeltaTable, partition_filters, expected_paths):
assert dt.files(partition_filters) == expected_paths
absolute_paths = [os.path.join(dt.table_uri, path) for path in expected_paths]
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
cfg-if = "1"
errno = "0.2"
futures = "0.3"
itertools = "0.10"
lazy_static = "1"
log = "0"
libc = ">=0.2.90, <1"
Expand Down
4 changes: 4 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

#![deny(warnings)]
#![deny(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this in order to build the docs for some reason. Newer lint?


#[cfg(all(feature = "parquet", feature = "parquet2"))]
compile_error!(
Expand All @@ -92,6 +93,9 @@ pub mod table_properties;
pub mod table_state;
pub mod time_utils;

#[cfg(all(feature = "arrow"))]
pub mod table_state_arrow;

#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod checkpoints;
#[cfg(all(feature = "arrow", feature = "parquet"))]
Expand Down
1 change: 0 additions & 1 deletion rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ impl DeltaTableState {
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use std::collections::HashMap;

#[test]
fn state_round_trip() {
Expand Down
Loading