Skip to content

Commit

Permalink
build(deps): update datafusion to latest and arrow to 51.0 (#3661)
Browse files Browse the repository at this point in the history
* chore: update datafusion

* update sqlness case of time.sql

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: adjust range query partition

* fix: hisogram incorrect result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: ignore filter pushdown temporarily

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: update limit sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: histogram with wrong distribution

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: update negative ordinal sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: bump df to cd7a00b

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve conflicts

* ignore test_range_filter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix promql exec panic

* fix "select count(*)" exec error

* re-enable the "test_range_filter" test since the filter push down seems not necessary to be removed

* fix: range query schema error

* update sqlness results

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve conflicts

* update datafusion, again

* fix pyo3 compile error, and update some sqlness results

* update decimal sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: promql literal

* fix udaf tests

* fix filter pushdown sqlness tests

* fix?: test_cast

* fix: rspy test fail due to datafusion `sin` signature change

* rebase main to see if there are any failed tests

* debug ci

* debug ci

* debug ci

* enforce input partition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* debug ci

* fix ci

* fix ci

* debug ci

* debug ci

* debug ci

* fix sqlness

* feat: do not return error while creating a filter

* chore: remove array from error

* chore: replace todo with unimplemented

* Update src/flow/clippy.toml

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: WUJingdi <taylor-lagrange@qq.com>
Co-authored-by: discord9 <discord9@163.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
6 people authored Apr 18, 2024
1 parent 5107822 commit 314f270
Show file tree
Hide file tree
Showing 174 changed files with 2,845 additions and 2,239 deletions.
884 changes: 560 additions & 324 deletions Cargo.lock

Large diffs are not rendered by default.

49 changes: 28 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,18 @@ clippy.implicit_clone = "warn"
rust.unknown_lints = "deny"

[workspace.dependencies]
# We turn off default-features for some dependencies here so the workspaces which inherit them can
# selectively turn them on if needed, since we can override default-features = true (from false)
# for the inherited dependency but cannot do the reverse (override from true to false).
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "47.0" }
arrow-array = "47.0"
arrow-flight = "47.0"
arrow-ipc = { version = "47.0", features = ["lz4"] }
arrow-schema = { version = "47.0", features = ["serde"] }
arrow = { version = "51.0.0", features = ["prettyprint"] }
arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] }
arrow-flight = "51.0"
arrow-ipc = { version = "51.0.0", default-features = false, features = ["lz4"] }
arrow-schema = { version = "51.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
axum = { version = "0.6", features = ["headers"] }
Expand All @@ -91,20 +96,22 @@ bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.12"
# TODO(LFC): Wait for https://github.com/etcdv3/etcd-client/pull/76
etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev = "4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "04d78b6e025ceb518040fdd10858c2a9d9345820" }
greptime-proto = { git = "https://github.com/MichaelScofield/greptime-proto.git", rev = "bdbd4cfa871ec8d192d3dbabf11debcb2cb67748" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand All @@ -115,12 +122,12 @@ moka = "0.12"
notify = "6.1"
num_cpus = "1.16"
once_cell = "1.18"
opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.git", rev = "33841b38dda79b15f2024952be5f32533325ca02", features = [
opentelemetry-proto = { version = "0.5", features = [
"gen-tonic",
"metrics",
"trace",
] }
parquet = "47.0"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
Expand All @@ -144,18 +151,18 @@ serde_with = "3"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.7"
sysinfo = "0.30"
# on branch v0.38.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [
# on branch v0.44.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "c919990bf62ad38d2b0c0a3bc90b26ad919d51b0", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.28", features = ["full"] }
tokio = { version = "1.36", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.10", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
tonic = { version = "0.11", features = ["tls"] }
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"

## workspaces members
Expand Down
34 changes: 2 additions & 32 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,37 +215,7 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) {
ColumnDataType::String,
)
}
DataType::Null
| DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
| DataType::Interval(_)
| DataType::Binary
| DataType::FixedSizeBinary(_)
| DataType::LargeBinary
| DataType::LargeUtf8
| DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::RunEndEncoded(_, _)
| DataType::Map(_, _) => todo!(),
_ => unimplemented!(),
}
}

Expand Down Expand Up @@ -444,7 +414,7 @@ fn create_table_expr(table_name: &str) -> CreateTableExpr {
fn query_set(table_name: &str) -> HashMap<String, String> {
HashMap::from([
(
"count_all".to_string(),
"count_all".to_string(),
format!("SELECT COUNT(*) FROM {table_name};"),
),
(
Expand Down
58 changes: 11 additions & 47 deletions src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ impl DfTableSourceProvider {
}
}

pub fn resolve_table_ref<'a>(
&'a self,
table_ref: TableReference<'a>,
) -> Result<ResolvedTableReference<'a>> {
pub fn resolve_table_ref(&self, table_ref: TableReference) -> Result<ResolvedTableReference> {
if self.disallow_cross_catalog_query {
match &table_ref {
TableReference::Bare { .. } => (),
Expand All @@ -76,7 +73,7 @@ impl DfTableSourceProvider {

pub async fn resolve_table(
&mut self,
table_ref: TableReference<'_>,
table_ref: TableReference,
) -> Result<Arc<dyn TableSource>> {
let table_ref = self.resolve_table_ref(table_ref)?;

Expand Down Expand Up @@ -106,8 +103,6 @@ impl DfTableSourceProvider {

#[cfg(test)]
mod tests {
use std::borrow::Cow;

use session::context::QueryContext;

use super::*;
Expand All @@ -120,68 +115,37 @@ mod tests {
let table_provider =
DfTableSourceProvider::new(MemoryCatalogManager::with_default_setup(), true, query_ctx);

let table_ref = TableReference::Bare {
table: Cow::Borrowed("table_name"),
};
let table_ref = TableReference::bare("table_name");
let result = table_provider.resolve_table_ref(table_ref);
assert!(result.is_ok());

let table_ref = TableReference::Partial {
schema: Cow::Borrowed("public"),
table: Cow::Borrowed("table_name"),
};
let table_ref = TableReference::partial("public", "table_name");
let result = table_provider.resolve_table_ref(table_ref);
assert!(result.is_ok());

let table_ref = TableReference::Partial {
schema: Cow::Borrowed("wrong_schema"),
table: Cow::Borrowed("table_name"),
};
let table_ref = TableReference::partial("wrong_schema", "table_name");
let result = table_provider.resolve_table_ref(table_ref);
assert!(result.is_ok());

let table_ref = TableReference::Full {
catalog: Cow::Borrowed("greptime"),
schema: Cow::Borrowed("public"),
table: Cow::Borrowed("table_name"),
};
let table_ref = TableReference::full("greptime", "public", "table_name");
let result = table_provider.resolve_table_ref(table_ref);
assert!(result.is_ok());

let table_ref = TableReference::Full {
catalog: Cow::Borrowed("wrong_catalog"),
schema: Cow::Borrowed("public"),
table: Cow::Borrowed("table_name"),
};
let table_ref = TableReference::full("wrong_catalog", "public", "table_name");
let result = table_provider.resolve_table_ref(table_ref);
assert!(result.is_err());

let table_ref = TableReference::Partial {
schema: Cow::Borrowed("information_schema"),
table: Cow::Borrowed("columns"),
};
let table_ref = TableReference::partial("information_schema", "columns");
let result = table_provider.resolve_table_ref(table_ref);
assert!(result.is_ok());

let table_ref = TableReference::Full {
catalog: Cow::Borrowed("greptime"),
schema: Cow::Borrowed("information_schema"),
table: Cow::Borrowed("columns"),
};
let table_ref = TableReference::full("greptime", "information_schema", "columns");
assert!(table_provider.resolve_table_ref(table_ref).is_ok());

let table_ref = TableReference::Full {
catalog: Cow::Borrowed("dummy"),
schema: Cow::Borrowed("information_schema"),
table: Cow::Borrowed("columns"),
};
let table_ref = TableReference::full("dummy", "information_schema", "columns");
assert!(table_provider.resolve_table_ref(table_ref).is_err());

let table_ref = TableReference::Full {
catalog: Cow::Borrowed("greptime"),
schema: Cow::Borrowed("greptime_private"),
table: Cow::Borrowed("columns"),
};
let table_ref = TableReference::full("greptime", "greptime_private", "columns");
assert!(table_provider.resolve_table_ref(table_ref).is_ok());
}
}
2 changes: 1 addition & 1 deletion src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ derive_builder.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
orc-rust = "0.2"
orc-rust = { git = "https://github.com/MichaelScofield/orc-rs.git", rev = "17347f5f084ac937863317df882218055c4ea8c1" }
parquet.workspace = true
paste = "1.0"
regex = "1.7"
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CsvConfig {
let mut builder = csv::ReaderBuilder::new(self.file_schema.clone())
.with_delimiter(self.delimiter)
.with_batch_size(self.batch_size)
.has_header(self.has_header);
.with_header(self.has_header);

if let Some(proj) = &self.file_projection {
builder = builder.with_projection(proj.clone());
Expand Down
3 changes: 2 additions & 1 deletion src/common/datasource/src/file_format/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::vec;

use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -166,7 +167,7 @@ async fn test_parquet_exec() {
.to_string();
let base_config = scan_config(schema.clone(), None, path);

let exec = ParquetExec::new(base_config, None, None)
let exec = ParquetExec::new(base_config, None, None, TableParquetOptions::default())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));

let ctx = SessionContext::new();
Expand Down
6 changes: 3 additions & 3 deletions src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datafusion::common::Statistics;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
Expand Down Expand Up @@ -72,17 +73,16 @@ pub fn test_basic_schema() -> SchemaRef {
pub fn scan_config(file_schema: SchemaRef, limit: Option<usize>, filename: &str) -> FileScanConfig {
// object_store only recognize the Unix style path, so make it happy.
let filename = &filename.replace('\\', "/");

let statistics = Statistics::new_unknown(file_schema.as_ref());
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
statistics: Default::default(),
statistics,
projection: None,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
}
}

Expand Down
22 changes: 5 additions & 17 deletions src/common/function/src/scalars/aggregate/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![Value::List(ListValue::new(
Some(Box::new(nums)),
nums,
I::LogicalType::build_data_type(),
))])
}
Expand Down Expand Up @@ -120,10 +120,7 @@ where
O::from_native(native).into()
})
.collect::<Vec<Value>>();
let diff = Value::List(ListValue::new(
Some(Box::new(diff)),
O::LogicalType::build_data_type(),
));
let diff = Value::List(ListValue::new(diff, O::LogicalType::build_data_type()));
Ok(diff)
}
}
Expand Down Expand Up @@ -218,10 +215,7 @@ mod test {
let values = vec![Value::from(2_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);

Expand All @@ -236,10 +230,7 @@ mod test {
let values = vec![Value::from(5_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);

Expand All @@ -252,10 +243,7 @@ mod test {
let values = vec![Value::from(0_i64), Value::from(0_i64), Value::from(0_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);
}
Expand Down
5 changes: 1 addition & 4 deletions src/common/function/src/scalars/aggregate/percentile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.p.into(),
])
}
Expand Down
5 changes: 1 addition & 4 deletions src/common/function/src/scalars/aggregate/polyval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
Expand Down
Loading

0 comments on commit 314f270

Please sign in to comment.