Skip to content

Commit

Permalink
Re-enable datafusion tests and improve supported types. (#601)
Browse files Browse the repository at this point in the history
* re-enable tests. 
* fix write operation 
* add support for more datatypes wit datafusion
* remove panics
  • Loading branch information
roeap authored May 10, 2022
1 parent a922daa commit 359c2fa
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 132 deletions.
159 changes: 79 additions & 80 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: build

on:
push:
branches: [ main, 'rust-v*' ]
branches: [main, "rust-v*"]
pull_request:
branches: [ main, 'rust-v*']
branches: [main, "rust-v*"]

defaults:
run:
Expand All @@ -14,15 +14,15 @@ jobs:
format:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: Format
run: cargo fmt -- --check
- uses: actions/checkout@v2
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: Format
run: cargo fmt -- --check

build:
strategy:
Expand All @@ -34,29 +34,29 @@ jobs:
- windows-2019
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
restore-keys: |
${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
${{ runner.os }}-cargo-
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: build and lint with clippy
run: cargo clippy --features azure,datafusion-ext,s3
- name: Spot-check build for rustls features
run: cargo clippy --features s3-rustls
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
restore-keys: |
${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
${{ runner.os }}-cargo-
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: build and lint with clippy
run: cargo clippy --features azure,datafusion-ext,s3
- name: Spot-check build for rustls features
run: cargo clippy --features s3-rustls

test:
strategy:
Expand All @@ -68,53 +68,52 @@ jobs:
- windows-2019
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
restore-keys: |
test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
test-${{ runner.os }}-cargo-
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: Run tests
#run: cargo test --verbose --features datafusion-ext,azure Comment out datafusion until https://github.com/apache/arrow-datafusion/pull/984 is merged
run: cargo test --verbose --features azure
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
restore-keys: |
test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
test-${{ runner.os }}-cargo-
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: Run tests
run: cargo test --verbose --features datafusion-ext,azure

s3_test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: s3-test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
restore-keys: |
s3-test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
s3-test-${{ runner.os }}-cargo-
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: Setup localstack
run: docker-compose up setup
- name: Run tests
run: |
cargo test --features s3
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: s3-test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
restore-keys: |
s3-test-${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock', '**/Cargo.toml') }}
s3-test-${{ runner.os }}-cargo-
- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: stable
override: true
- name: Setup localstack
run: docker-compose up setup
- name: Run tests
run: |
cargo test --features s3
110 changes: 69 additions & 41 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl delta::DeltaTable {
if left_larger_than_right(
old_max_value.clone(),
max_value.clone(),
) {
)? {
Some(old_max_value)
} else {
Some(max_value)
Expand All @@ -125,7 +125,7 @@ impl delta::DeltaTable {
if left_larger_than_right(
min_value.clone(),
old_min_value.clone(),
) {
)? {
Some(old_min_value)
} else {
Some(min_value)
Expand Down Expand Up @@ -271,16 +271,21 @@ impl TableProvider for delta::DeltaTable {
}

fn to_scalar_value(stat_val: &serde_json::Value) -> Option<datafusion::scalar::ScalarValue> {
if stat_val.is_number() {
if let Some(val) = stat_val.as_i64() {
Some(ScalarValue::from(val))
} else if let Some(val) = stat_val.as_u64() {
Some(ScalarValue::from(val))
} else {
stat_val.as_f64().map(ScalarValue::from)
match stat_val {
serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)),
serde_json::Value::Number(num) => {
if let Some(val) = num.as_i64() {
Some(ScalarValue::from(val))
} else if let Some(val) = num.as_u64() {
Some(ScalarValue::from(val))
} else {
num.as_f64().map(ScalarValue::from)
}
}
} else {
None
serde_json::Value::String(s) => Some(ScalarValue::from(s.as_str())),
serde_json::Value::Array(_) => None,
serde_json::Value::Object(_) => None,
serde_json::Value::Null => None,
}
}

Expand All @@ -290,51 +295,63 @@ fn correct_scalar_value_type(
) -> Option<datafusion::scalar::ScalarValue> {
match field_dt {
ArrowDataType::Int64 => {
let raw_value = i64::try_from(value).unwrap();
let raw_value = i64::try_from(value).ok()?;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Int32 => {
let raw_value = i64::try_from(value).unwrap() as i32;
let raw_value = i64::try_from(value).ok()? as i32;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Int16 => {
let raw_value = i64::try_from(value).unwrap() as i16;
let raw_value = i64::try_from(value).ok()? as i16;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Int8 => {
let raw_value = i64::try_from(value).unwrap() as i8;
let raw_value = i64::try_from(value).ok()? as i8;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Float32 => {
let raw_value = f64::try_from(value).unwrap() as f32;
let raw_value = f64::try_from(value).ok()? as f32;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Float64 => {
let raw_value = f64::try_from(value).unwrap();
let raw_value = f64::try_from(value).ok()?;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Utf8 => match value {
ScalarValue::Utf8(val) => Some(ScalarValue::Utf8(val)),
_ => None,
},
ArrowDataType::LargeUtf8 => match value {
ScalarValue::Utf8(val) => Some(ScalarValue::LargeUtf8(val)),
_ => None,
},
ArrowDataType::Boolean => {
let raw_value = bool::try_from(value).ok()?;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Decimal(_, _) => {
let raw_value = f64::try_from(value).unwrap();
let raw_value = f64::try_from(value).ok()?;
Some(ScalarValue::from(raw_value))
}
ArrowDataType::Date32 => {
let raw_value = i64::try_from(value).unwrap() as i32;
let raw_value = i64::try_from(value).ok()? as i32;
Some(ScalarValue::Date32(Some(raw_value)))
}
ArrowDataType::Date64 => {
let raw_value = i64::try_from(value).unwrap();
let raw_value = i64::try_from(value).ok()?;
Some(ScalarValue::Date64(Some(raw_value)))
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) => {
let raw_value = i64::try_from(value).unwrap();
let raw_value = i64::try_from(value).ok()?;
Some(ScalarValue::TimestampNanosecond(Some(raw_value), None))
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
let raw_value = i64::try_from(value).unwrap();
let raw_value = i64::try_from(value).ok()?;
Some(ScalarValue::TimestampMicrosecond(Some(raw_value), None))
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => {
let raw_value = i64::try_from(value).unwrap();
let raw_value = i64::try_from(value).ok()?;
Some(ScalarValue::TimestampMillisecond(Some(raw_value), None))
}
_ => {
Expand All @@ -351,36 +368,47 @@ fn correct_scalar_value_type(
fn left_larger_than_right(
left: datafusion::scalar::ScalarValue,
right: datafusion::scalar::ScalarValue,
) -> bool {
) -> Option<bool> {
match left {
ScalarValue::Float64(Some(v)) => {
let f_right = f64::try_from(right).unwrap();
v > f_right
let f_right = f64::try_from(right).ok()?;
Some(v > f_right)
}
ScalarValue::Float32(Some(v)) => {
let f_right = f32::try_from(right).unwrap();
v > f_right
let f_right = f32::try_from(right).ok()?;
Some(v > f_right)
}
ScalarValue::Int8(Some(v)) => {
let i_right = i8::try_from(right).unwrap();
v > i_right
let i_right = i8::try_from(right).ok()?;
Some(v > i_right)
}
ScalarValue::Int16(Some(v)) => {
let i_right = i16::try_from(right).unwrap();
v > i_right
let i_right = i16::try_from(right).ok()?;
Some(v > i_right)
}
ScalarValue::Int32(Some(v)) => {
let i_right = i32::try_from(right).unwrap();
v > i_right
let i_right = i32::try_from(right).ok()?;
Some(v > i_right)
}
ScalarValue::Int64(Some(v)) => {
let i_right = i64::try_from(right).unwrap();
v > i_right
let i_right = i64::try_from(right).ok()?;
Some(v > i_right)
}
ScalarValue::Boolean(Some(v)) => {
let b_right = bool::try_from(right).ok()?;
Some(v & !b_right)
}
ScalarValue::Utf8(Some(v)) => match right {
ScalarValue::Utf8(Some(s_right)) => Some(v > s_right),
_ => None,
},
_ => {
log::error!(
"Scalar value comparison unimplemented for {:?} and {:?}",
left,
right
);
None
}
_ => unimplemented!(
"Scalar value comparison unimplemented for {:?} and {:?}",
left,
right
),
}
}
Loading

0 comments on commit 359c2fa

Please sign in to comment.