Skip to content

Commit

Permalink
Merge branch 'main' into feat/allow_multiple_when_clauses_merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Nov 4, 2023
2 parents 3cd8b06 + 45e7841 commit b7e84bc
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 31 deletions.
64 changes: 64 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: Build documentation

on:
pull_request:
paths:
- python/**
- docs/**
- mkdocs.yml
- .github/workflows/docs.yml
jobs:
markdown-link-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: gaurav-nelson/github-action-markdown-link-check@v1
with:
config-file: docs/mlc-config.json
folder-path: docs

lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
src: docs/src/python

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

- uses: Swatinem/rust-cache@v2

- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.10'

- name: Build and install deltalake
run: |
cd python
pip install virtualenv
virtualenv venv
source venv/bin/activate
make develop
cd ..
- name: Install dependencies
run: |
source python/venv/bin/activate
pip install -r docs/requirements.txt
- name: Build documentation
run: |
source python/venv/bin/activate
mkdocs build
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ Cargo.lock
!/proofs/Cargo.lock

justfile
site
site
__pycache__
111 changes: 91 additions & 20 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,14 @@ impl<'a> DeltaScanBuilder<'a> {
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();

let table_partition_cols = &self
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

for action in files.iter() {
let mut part = partitioned_file_from_action(action, &schema);
let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);

if config.file_column_name.is_some() {
part.partition_values
Expand All @@ -602,13 +608,6 @@ impl<'a> DeltaScanBuilder<'a> {
.push(part);
}

let table_partition_cols = self
.snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.clone();

let file_schema = Arc::new(ArrowSchema::new(
schema
.fields()
Expand Down Expand Up @@ -923,20 +922,30 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal

pub(crate) fn partitioned_file_from_action(
action: &protocol::Add,
partition_columns: &[String],
schema: &ArrowSchema,
) -> PartitionedFile {
let partition_values = schema
.fields()
let partition_values = partition_columns
.iter()
.filter_map(|f| {
action.partition_values.get(f.name()).map(|val| match val {
Some(value) => to_correct_scalar_value(
&serde_json::Value::String(value.to_string()),
f.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null),
})
.map(|part| {
action
.partition_values
.get(part)
.map(|val| {
schema
.field_with_name(part)
.map(|field| match val {
Some(value) => to_correct_scalar_value(
&serde_json::Value::String(value.to_string()),
field.data_type(),
)
.unwrap_or(ScalarValue::Null),
None => get_null_of_arrow_type(field.data_type())
.unwrap_or(ScalarValue::Null),
})
.unwrap_or(ScalarValue::Null)
})
.unwrap_or(ScalarValue::Null)
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -1618,6 +1627,7 @@ pub async fn find_files<'a>(

#[cfg(test)]
mod tests {
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -1797,7 +1807,8 @@ mod tests {
Field::new("month", ArrowDataType::Int64, true),
]);

let file = partitioned_file_from_action(&action, &schema);
let part_columns = vec!["year".to_string(), "month".to_string()];
let file = partitioned_file_from_action(&action, &part_columns, &schema);
let ref_file = PartitionedFile {
object_meta: object_store::ObjectMeta {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
Expand Down Expand Up @@ -1929,4 +1940,64 @@ mod tests {
];
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn delta_scan_mixed_partition_order() {
// Tests issue (1787) where partition columns were incorrect when they
// have a different order in the metadata and table schema
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("modified", DataType::Utf8, true),
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
]));

let table = crate::DeltaOps::new_in_memory()
.create()
.with_columns(get_delta_schema().get_fields().clone())
.with_partition_columns(["modified", "id"])
.await
.unwrap();
assert_eq!(table.version(), 0);

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();
// write some data
let table = crate::DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(crate::protocol::SaveMode::Append)
.await
.unwrap();

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();

let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

let df = ctx.sql("select * from test").await.unwrap();
let actual = df.collect().await.unwrap();
let expected = vec![
"+-------+------------+----+",
"| value | modified | id |",
"+-------+------------+----+",
"| 1 | 2021-02-01 | A |",
"| 10 | 2021-02-01 | B |",
"| 100 | 2021-02-02 | D |",
"| 20 | 2021-02-02 | C |",
"+-------+------------+----+",
];
assert_batches_sorted_eq!(&expected, &actual);
}
}
7 changes: 7 additions & 0 deletions docs/mlc_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"ignorePatterns": [
{
"pattern": "^https://crates.io/"
}
]
}
10 changes: 7 additions & 3 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
mkdocs
mkdocstrings[python]
mkdocs-autorefs
mkdocs==1.5.3
mkdocstrings[python]==0.23.0
mkdocs-autorefs==0.5.0
mkdocs-material==9.4.5
mkdocs-macros-plugin==1.0.4
markdown-exec[ansi]==1.7.0
mkdocs-simple-hooks==0.1.5
2 changes: 1 addition & 1 deletion docs/src/python/delta_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ def get_table_info():
dt = DeltaTable("../rust/tests/data/delta-0.2.0")
print(f"Version: {dt.version()}")
print(f"Files: {dt.files()}")
# --8<-- [end:get_table_info]
# --8<-- [end:get_table_info]
2 changes: 1 addition & 1 deletion docs/usage/querying-delta-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ scanning operation.
```

PyArrow datasets may also be passed to compatible query engines, such as
[DuckDB](https://duckdb.org/docs/api/python).
[DuckDB](https://duckdb.org/docs/api/python/overview.html)

``` python
>>> import duckdb
Expand Down
8 changes: 8 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ nav:
- api/storage.md
not_in_nav: |
/_build/
exclude_docs: |
/_build/
/mlc_config.json
/src
/requirements.txt
*.py
plugins:
- autorefs
- mkdocstrings:
Expand Down
9 changes: 5 additions & 4 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ def repair(self, dry_run: bool = False) -> Dict[str, Any]:
or corrupted files.
Args:
dry_run(bool): when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False.
dry_run: when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False.
Returns:
The metrics from repair (FSCK) action.
Expand All @@ -844,9 +844,10 @@ def repair(self, dry_run: bool = False) -> Dict[str, Any]:
from deltalake import DeltaTable
dt = DeltaTable('TEST')
dt.repair(dry_run=False)
{'dry_run': False,
'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet',
'5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']}
```
Results in
```
{'dry_run': False, 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', 5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']}
```
"""
metrics = self._table.repair(dry_run)
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pyspark = [
]

[project.urls]
documentation = "https://delta-io.github.io/delta-rs/python/"
documentation = "https://delta-io.github.io/delta-rs/"
repository = "https://github.com/delta-io/delta-rs/tree/main/python/"

[tool.mypy]
Expand Down

0 comments on commit b7e84bc

Please sign in to comment.