Skip to content

Commit

Permalink
feat(function): Support variant order by
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed May 30, 2022
1 parent b90b74e commit ddb4363
Show file tree
Hide file tree
Showing 23 changed files with 590 additions and 118 deletions.
259 changes: 170 additions & 89 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ simd = ["arrow/simd"]
arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, features = [
"io_parquet",
"io_parquet_compression",
], rev = "826a2b8" }
], rev = "48a3087" }

# Crates.io dependencies
arrow-format = { version = "0.4.0", features = ["flight-data", "flight-service", "ipc"] }
arrow-format = { version = "0.6.0", features = ["flight-data", "flight-service", "ipc"] }
futures = "0.3.21"
parquet2 = { version = "0.12", default_features = false }

Expand Down
2 changes: 2 additions & 0 deletions common/datablocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ common-io = { path = "../io" }
ahash = "0.7.6"
comfy-table = "5.0.1"
regex = "1.5.5"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"

[dev-dependencies]
pretty_assertions = "1.2.1"
44 changes: 42 additions & 2 deletions common/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ use std::iter::once;
use std::sync::Arc;

use common_arrow::arrow::array::growable::make_growable;
use common_arrow::arrow::array::ord as arrow_ord;
use common_arrow::arrow::array::ord::DynComparator;
use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::compute::merge_sort::*;
use common_arrow::arrow::compute::sort as arrow_sort;
use common_arrow::arrow::datatypes::DataType as ArrowType;
use common_arrow::arrow::error::Error as ArrowError;
use common_arrow::arrow::error::Result as ArrowResult;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -58,7 +63,7 @@ impl DataBlock {
})
.collect::<Result<Vec<_>>>()?;

let indices = arrow_sort::lexsort_to_indices(&order_arrays, limit)?;
let indices = arrow_sort::lexsort_to_indices_impl(&order_arrays, limit, &build_compare)?;
DataBlock::block_take_by_indices(block, indices.values())
}

Expand Down Expand Up @@ -111,7 +116,7 @@ impl DataBlock {
})
.collect::<Vec<_>>();

let comparator = build_comparator(&sort_options_with_array)?;
let comparator = build_comparator_impl(&sort_options_with_array, &build_compare)?;
let lhs_indices = (0, 0, lhs.num_rows());
let rhs_indices = (1, 0, rhs.num_rows());
let slices = merge_sort_slices(once(&lhs_indices), once(&rhs_indices), &comparator);
Expand Down Expand Up @@ -203,3 +208,38 @@ impl DataBlock {
}
}
}

fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult<DynComparator> {
let left = VariantColumn::from_arrow_array(left);
let right = VariantColumn::from_arrow_array(right);

Ok(Box::new(move |i, j| {
left.get_data(i).cmp(right.get_data(j))
}))
}

fn compare_array(left: &dyn Array, right: &dyn Array) -> ArrowResult<DynComparator> {
let left = ArrayColumn::from_arrow_array(left);
let right = ArrayColumn::from_arrow_array(right);

Ok(Box::new(move |i, j| {
left.get_data(i).cmp(&right.get_data(j))
}))
}

fn build_compare(left: &dyn Array, right: &dyn Array) -> ArrowResult<DynComparator> {
match left.data_type() {
ArrowType::LargeList(_) => compare_array(left, right),
ArrowType::Extension(name, _, _) => {
if name == "Variant" || name == "VariantArray" || name == "VariantObject" {
compare_variant(left, right)
} else {
return Err(ArrowError::NotYetImplemented(format!(
"Sort not supported for data type {:?}",
left.data_type()
)));
}
}
_ => arrow_ord::build_compare(left, right),
}
}
259 changes: 259 additions & 0 deletions common/datablocks/tests/it/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use common_datablocks::*;
use common_datavalues::prelude::*;
use common_exception::Result;
use serde_json::json;

#[test]
fn test_data_block_sort() -> Result<()> {
Expand Down Expand Up @@ -69,6 +70,158 @@ fn test_data_block_sort() -> Result<()> {
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

{
let options = vec![SortColumnDescription {
column_name: "b".to_owned(),
asc: true,
nulls_first: false,
}];
let results = DataBlock::sort_block(&raw, &options, Some(3))?;
assert_eq!(raw.schema(), results.schema());

let expected = vec![
"+---+----+",
"| a | b |",
"+---+----+",
"| 6 | b1 |",
"| 4 | b2 |",
"| 3 | b3 |",
"+---+----+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

{
let options = vec![SortColumnDescription {
column_name: "b".to_owned(),
asc: false,
nulls_first: false,
}];
let results = DataBlock::sort_block(&raw, &options, Some(3))?;
assert_eq!(raw.schema(), results.schema());

let expected = vec![
"+---+----+",
"| a | b |",
"+---+----+",
"| 7 | b6 |",
"| 1 | b5 |",
"| 2 | b4 |",
"+---+----+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

let schema = DataSchemaRefExt::create(vec![
DataField::new("c", VariantValue::to_data_type()),
DataField::new("d", ArrayType::new_impl(i64::to_data_type())),
]);

let raw = DataBlock::create(schema, vec![
Series::from_data(vec![
VariantValue::from(json!(true)),
VariantValue::from(json!(123)),
VariantValue::from(json!(12.34)),
VariantValue::from(json!("abc")),
VariantValue::from(json!([1, 2, 3])),
VariantValue::from(json!({"a":"b"})),
]),
Series::from_data(vec![
ArrayValue::new(vec![1_i64.into(), 2_i64.into()]),
ArrayValue::new(vec![1_i64.into(), 2_i64.into(), 3_i64.into()]),
ArrayValue::new(vec![1_i64.into(), 2_i64.into(), 4_i64.into()]),
ArrayValue::new(vec![4_i64.into(), 5_i64.into(), 6_i64.into()]),
ArrayValue::new(vec![6_i64.into()]),
ArrayValue::new(vec![7_i64.into(), 8_i64.into(), 9_i64.into()]),
]),
]);

{
let options = vec![SortColumnDescription {
column_name: "c".to_owned(),
asc: true,
nulls_first: false,
}];
let results = DataBlock::sort_block(&raw, &options, Some(3))?;
assert_eq!(raw.schema(), results.schema());

let expected = vec![
"+-------+-----------+",
"| c | d |",
"+-------+-----------+",
"| true | [1, 2] |",
"| 12.34 | [1, 2, 4] |",
"| 123 | [1, 2, 3] |",
"+-------+-----------+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

{
let options = vec![SortColumnDescription {
column_name: "c".to_owned(),
asc: false,
nulls_first: false,
}];
let results = DataBlock::sort_block(&raw, &options, Some(3))?;
assert_eq!(raw.schema(), results.schema());

let expected = vec![
"+-----------+-----------+",
"| c | d |",
"+-----------+-----------+",
"| [1,2,3] | [6] |",
"| {\"a\":\"b\"} | [7, 8, 9] |",
"| \"abc\" | [4, 5, 6] |",
"+-----------+-----------+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

{
let options = vec![SortColumnDescription {
column_name: "d".to_owned(),
asc: true,
nulls_first: false,
}];
println!("raw={:?}", raw);
let results = DataBlock::sort_block(&raw, &options, Some(3))?;
assert_eq!(raw.schema(), results.schema());

let expected = vec![
"+-------+-----------+",
"| c | d |",
"+-------+-----------+",
"| true | [1, 2] |",
"| 123 | [1, 2, 3] |",
"| 12.34 | [1, 2, 4] |",
"+-------+-----------+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

{
let options = vec![SortColumnDescription {
column_name: "d".to_owned(),
asc: false,
nulls_first: false,
}];
let results = DataBlock::sort_block(&raw, &options, Some(3))?;
assert_eq!(raw.schema(), results.schema());

let expected = vec![
"+-----------+-----------+",
"| c | d |",
"+-----------+-----------+",
"| {\"a\":\"b\"} | [7, 8, 9] |",
"| [1,2,3] | [6] |",
"| \"abc\" | [4, 5, 6] |",
"+-----------+-----------+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

Ok(())
}

Expand Down Expand Up @@ -114,5 +267,111 @@ fn test_data_block_merge_sort() -> Result<()> {
common_datablocks::assert_blocks_eq(expected, &[results]);
}

{
let options = vec![SortColumnDescription {
column_name: "b".to_owned(),
asc: true,
nulls_first: false,
}];
let results = DataBlock::merge_sort_block(&raw1, &raw2, &options, None)?;

assert_eq!(raw1.schema(), results.schema());

let expected = vec![
"+---+----+",
"| a | b |",
"+---+----+",
"| 3 | b1 |",
"| 5 | b2 |",
"| 7 | b3 |",
"| 2 | b4 |",
"| 4 | b5 |",
"| 6 | b6 |",
"+---+----+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

let schema = DataSchemaRefExt::create(vec![
DataField::new("c", VariantValue::to_data_type()),
DataField::new("d", ArrayType::new_impl(i64::to_data_type())),
]);

let raw1 = DataBlock::create(schema.clone(), vec![
Series::from_data(vec![
VariantValue::from(json!(true)),
VariantValue::from(json!("abc")),
VariantValue::from(json!([1, 2, 3])),
]),
Series::from_data(vec![
ArrayValue::new(vec![1_i64.into(), 2_i64.into()]),
ArrayValue::new(vec![1_i64.into(), 2_i64.into(), 3_i64.into()]),
ArrayValue::new(vec![6_i64.into()]),
]),
]);

let raw2 = DataBlock::create(schema, vec![
Series::from_data(vec![
VariantValue::from(json!(12.34)),
VariantValue::from(json!(123)),
VariantValue::from(json!({"a":"b"})),
]),
Series::from_data(vec![
ArrayValue::new(vec![1_i64.into(), 2_i64.into(), 4_i64.into()]),
ArrayValue::new(vec![4_i64.into(), 5_i64.into(), 6_i64.into()]),
ArrayValue::new(vec![7_i64.into(), 8_i64.into(), 9_i64.into()]),
]),
]);

{
let options = vec![SortColumnDescription {
column_name: "c".to_owned(),
asc: true,
nulls_first: false,
}];
let results = DataBlock::merge_sort_block(&raw1, &raw2, &options, None)?;

assert_eq!(raw1.schema(), results.schema());

let expected = vec![
"+-----------+-----------+",
"| c | d |",
"+-----------+-----------+",
"| true | [1, 2] |",
"| 12.34 | [1, 2, 4] |",
"| 123 | [4, 5, 6] |",
"| \"abc\" | [1, 2, 3] |",
"| {\"a\":\"b\"} | [7, 8, 9] |",
"| [1,2,3] | [6] |",
"+-----------+-----------+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

{
let options = vec![SortColumnDescription {
column_name: "d".to_owned(),
asc: true,
nulls_first: false,
}];
let results = DataBlock::merge_sort_block(&raw1, &raw2, &options, None)?;

assert_eq!(raw1.schema(), results.schema());

let expected = vec![
"+-----------+-----------+",
"| c | d |",
"+-----------+-----------+",
"| true | [1, 2] |",
"| \"abc\" | [1, 2, 3] |",
"| 12.34 | [1, 2, 4] |",
"| 123 | [4, 5, 6] |",
"| [1,2,3] | [6] |",
"| {\"a\":\"b\"} | [7, 8, 9] |",
"+-----------+-----------+",
];
common_datablocks::assert_blocks_eq(expected, &[results]);
}

Ok(())
}
4 changes: 2 additions & 2 deletions common/exception/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ common-arrow = { path = "../arrow" }
anyhow = "1.0.56"
octocrab = "0.15.4"
paste = "1.0.7"
prost = "=0.9.0"
prost = "=0.10.1"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
thiserror = "1.0.30"
time = "0.3.9"
tonic = "=0.6.2"
tonic = "=0.7.2"

# Github dependencies
bincode = { git = "https://github.com/datafuse-extras/bincode", rev = "fd3f9ff" }
Expand Down
4 changes: 2 additions & 2 deletions common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ impl From<std::num::TryFromIntError> for ErrorCode {
}
}

impl From<common_arrow::arrow::error::ArrowError> for ErrorCode {
fn from(error: common_arrow::arrow::error::ArrowError) -> Self {
impl From<common_arrow::arrow::error::Error> for ErrorCode {
fn from(error: common_arrow::arrow::error::Error) -> Self {
ErrorCode::from_std_error(error)
}
}
Expand Down
Loading

0 comments on commit ddb4363

Please sign in to comment.