diff --git a/Cargo.lock b/Cargo.lock index 859e86e962c8..5b5e4c190107 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1808,6 +1808,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "buf-list" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f56bd1685d994a3e2a3ed802eb1ecee8cb500b0ad4df48cb4d5d1a2f04749c3a" +dependencies = [ + "bytes", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -3568,7 +3577,6 @@ dependencies = [ "libm", "match-template", "md-5", - "multiversion", "naive-cityhash", "num-traits", "once_cell", @@ -5132,9 +5140,11 @@ dependencies = [ "backoff", "backon 0.4.4", "base64 0.21.7", + "buf-list", "bumpalo", "byte-unit", "byteorder", + "bytes", "chrono", "chrono-tz 0.8.6", "config", @@ -8538,7 +8548,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -9570,7 +9580,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -10198,28 +10208,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "multiversion" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4851161a11d3ad0bf9402d90ffc3967bf231768bfd7aeb61755ad06dbf1a142" -dependencies = [ - "multiversion-macros", - "target-features", -] - -[[package]] -name = "multiversion-macros" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a74ddee9e0c27d2578323c13905793e91622148f138ba29738f9dddb835e90" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "target-features", -] - [[package]] name = "mur3" version = "0.1.0" @@ -12110,7 +12098,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.58", @@ -12254,7 +12242,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.3", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -14953,12 +14941,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" -[[package]] -name = "target-features" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1bbb9f3c5c463a01705937a24fdabc5047929ac764b2d5b9cf681c1f5041ed5" - [[package]] name = "target-lexicon" version = "0.12.16" @@ -15291,9 +15273,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", @@ -15614,7 +15596,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index d3d9aad56f3f..c8273d4b01bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -351,18 +351,26 @@ useless_format = "allow" mutable_key_type = "allow" result_large_err = "allow" +## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile. +## Test SQL: +## select sum(number) from numbers_mt(10000000000); ~ 3x performance +## select max(number) from numbers_mt(10000000000); ~ 3x performance +# [profile.release] +# debug = 1 +# lto = "thin" +# overflow-checks = false +# incremental = false +# codegen-units = 1 + [profile.release] debug = 1 lto = "thin" overflow-checks = false +opt-level = "s" ## defaults to be 3 incremental = false -opt-level = "s" - -# codegen-units = 1 # Reduce number of codegen units to increase optimizations. # [profile.release.package] -# arrow2 = { codegen-units = 4 } -# common-functions = { codegen-units = 16 } +# databend-common-arrow = { codegen-units = 16 } # databend-query = { codegen-units = 4 } # databend-binaries = { codegen-units = 4 } diff --git a/Makefile b/Makefile index 0d5747fdcd85..d527f978dcf4 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,10 @@ run-debug: build run-debug-management: build bash ./scripts/ci/deploy/databend-query-management-mode.sh +kill: + killall databend-query + killall databend-meta + build: bash ./scripts/build/build-debug.sh diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index 192a6ecc5e46..6f92a2043a62 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -16,23 +16,92 @@ use std::alloc::AllocError; use std::alloc::Allocator; use std::alloc::Global; use std::alloc::Layout; +use std::fmt; use std::io; use std::io::IoSlice; use std::io::SeekFrom; +use std::io::Write; use std::ops::Range; use std::os::fd::BorrowedFd; use std::os::unix::io::AsRawFd; use std::path::Path; -use std::ptr::Alignment; +use std::ptr; use std::ptr::NonNull; +use rustix::fs::OFlags; use tokio::fs::File; use tokio::io::AsyncSeekExt; use crate::runtime::spawn_blocking; +#[derive(Copy, Clone, PartialEq, Eq)] + +pub struct Alignment(ptr::Alignment); + +impl Alignment { + pub const MIN: Self = Self(ptr::Alignment::MIN); + + #[inline] + pub const fn new(align: usize) -> Option { + match ptr::Alignment::new(align) { + Some(a) => Some(Alignment(a)), + None => None, + } + } + + #[inline] + pub const fn as_usize(self) -> usize { + self.0.as_usize() + } + + #[inline] + pub const fn align_up(self, value: usize) -> usize { + (value + self.as_usize() - 1) & self.mask() + } + + #[inline] + pub const fn align_down(self, value: usize) -> usize { + value & self.mask() + } + + #[inline] + pub const fn align_up_count(self, value: usize) -> usize { + (value + self.as_usize() - 1) >> self.log2() + } + + #[inline] + pub const fn align_down_count(self, value: usize) -> usize { + value >> self.log2() + } + + #[inline] + pub const fn mask(self) -> usize { + self.0.mask() + } + + #[inline] + pub const fn log2(self) -> u32 { + self.0.log2() + } +} + +impl fmt::Debug for Alignment { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.0) + } +} + +impl TryFrom for Alignment { + type Error = std::num::TryFromIntError; + + fn try_from(value: usize) -> Result { + Ok(Alignment(value.try_into()?)) + } +} + unsafe impl Send for DmaAllocator {} +#[derive(Clone, Copy)] pub struct DmaAllocator(Alignment); impl DmaAllocator { @@ -41,11 +110,15 @@ impl DmaAllocator { } fn real_layout(&self, layout: Layout) -> Layout { - Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap() + if layout.align() >= self.0.as_usize() { + layout + } else { + Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap() + } } fn real_cap(&self, cap: usize) -> usize { - align_up(self.0, cap) + self.0.align_up(cap) } } @@ -108,81 +181,61 @@ struct DmaFile { buf: Option, } -#[cfg(target_os = "linux")] -pub mod linux { - use rustix::fs::OFlags; - - use super::*; - - impl DmaFile { - /// Attempts to open a file in read-only mode. - pub(super) async fn open(path: impl AsRef) -> io::Result { - let file = File::options() - .read(true) - .custom_flags(OFlags::DIRECT.bits() as i32) - .open(path) - .await?; - - open_dma(file).await +impl DmaFile { + async fn open_raw(path: impl AsRef, dio: bool) -> io::Result { + let mut flags = 0; + #[cfg(target_os = "linux")] + if dio { + flags = OFlags::DIRECT.bits() as i32 } - /// Opens a file in write-only mode. - pub(super) async fn create(path: impl AsRef) -> io::Result { - let file = File::options() - .write(true) - .create(true) - .truncate(true) - .custom_flags((OFlags::DIRECT | OFlags::EXCL).bits() as i32) - .open(path) - .await?; - - open_dma(file).await - } + File::options() + .read(true) + .custom_flags(flags) + .open(path) + .await } -} - -#[cfg(not(target_os = "linux"))] -pub mod not_linux { - use rustix::fs::OFlags; - use super::*; + async fn create_raw(path: impl AsRef, dio: bool) -> io::Result { + let mut flags = OFlags::EXCL; + #[cfg(target_os = "linux")] + if dio { + flags |= OFlags::DIRECT; + } - impl DmaFile { - /// Attempts to open a file in read-only mode. - pub(super) async fn open(path: impl AsRef) -> io::Result { - let file = File::options().read(true).open(path).await?; + File::options() + .write(true) + .create(true) + .truncate(true) + .custom_flags(flags.bits() as i32) + .open(path) + .await + } - open_dma(file).await - } + /// Attempts to open a file in read-only mode. + async fn open(path: impl AsRef, dio: bool) -> io::Result { + let file = DmaFile::open_raw(path, dio).await?; + open_dma(file).await + } - /// Opens a file in write-only mode. - pub(super) async fn create(path: impl AsRef) -> io::Result { - let file = File::options() - .write(true) - .create(true) - .truncate(true) - .custom_flags(OFlags::EXCL.bits() as i32) - .open(path) - .await?; - - open_dma(file).await - } + /// Opens a file in write-only mode. + async fn create(path: impl AsRef, dio: bool) -> io::Result { + let file = DmaFile::create_raw(path, dio).await?; + open_dma(file).await } -} -impl DmaFile { fn set_buffer(&mut self, buf: DmaBuffer) { self.buf = Some(buf) } /// Aligns `value` up to the memory alignment requirement for this file. pub fn align_up(&self, value: usize) -> usize { - align_up(self.alignment, value) + self.alignment.align_up(value) } /// Aligns `value` down to the memory alignment requirement for this file. pub fn align_down(&self, value: usize) -> usize { - align_down(self.alignment, value) + self.alignment.align_down(value) } /// Return the alignment requirement for this file. The returned alignment value can be used @@ -240,14 +293,6 @@ impl DmaFile { } } -pub fn align_up(alignment: Alignment, value: usize) -> usize { - (value + alignment.as_usize() - 1) & alignment.mask() -} - -pub fn align_down(alignment: Alignment, value: usize) -> usize { - value & alignment.mask() -} - async fn open_dma(file: File) -> io::Result { let stat = fstatvfs(&file).await?; let alignment = Alignment::new(stat.f_bsize.max(512) as usize).unwrap(); @@ -282,11 +327,108 @@ where } } +pub struct DmaWriteBuf { + allocator: DmaAllocator, + data: Vec>, + chunk: usize, +} + +impl DmaWriteBuf { + pub fn new(align: Alignment, chunk: usize) -> DmaWriteBuf { + DmaWriteBuf { + allocator: DmaAllocator::new(align), + data: Vec::new(), + chunk: align.align_up(chunk), + } + } + + pub fn size(&self) -> usize { + if self.data.is_empty() { + return 0; + } + + (self.data.len() - 1) * self.chunk + self.data.last().unwrap().len() + } + + pub async fn into_file(mut self, path: impl AsRef, dio: bool) -> io::Result { + let mut file = DmaFile { + fd: DmaFile::create_raw(path, dio).await?, + alignment: self.allocator.0, + buf: None, + }; + + let file_length = self.size(); + + let Some(mut last) = self.data.pop() else { + return Ok(0); + }; + + for buf in self.data { + debug_assert_eq!(buf.len(), buf.capacity()); + file.set_buffer(buf); + file = asyncify(move || file.write_direct().map(|_| file)).await?; + } + + let len = last.len(); + let align_up = file.align_up(len); + if align_up == len { + file.set_buffer(last); + asyncify(move || file.write_direct()).await?; + } else { + unsafe { last.set_len(align_up) } + file.set_buffer(last); + asyncify(move || { + file.write_direct()?; + file.truncate(file_length) + }) + .await?; + } + Ok(file_length) + } + + pub fn into_data(self) -> Vec { + self.data + } +} + +impl Write for DmaWriteBuf { + fn write(&mut self, mut buf: &[u8]) -> io::Result { + let n = buf.len(); + while !buf.is_empty() { + let (dst, remain) = match self.data.last_mut() { + Some(dst) if dst.len() < dst.capacity() => { + let remain = dst.capacity() - dst.len(); + (dst, remain) + } + _ => { + self.data + .push(Vec::with_capacity_in(self.chunk, self.allocator)); + (self.data.last_mut().unwrap(), self.chunk) + } + }; + + if buf.len() <= remain { + dst.extend_from_slice(buf); + buf = &buf[buf.len()..] + } else { + let (left, right) = buf.split_at(remain); + dst.extend_from_slice(left); + buf = right + } + } + Ok(n) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + pub async fn dma_write_file_vectored<'a>( path: impl AsRef, bufs: &'a [IoSlice<'a>], ) -> io::Result { - let mut file = DmaFile::create(path.as_ref()).await?; + let mut file = DmaFile::create(path.as_ref(), true).await?; let file_length = bufs.iter().map(|buf| buf.len()).sum(); if file_length == 0 { @@ -344,7 +486,7 @@ pub async fn dma_read_file( mut writer: impl io::Write, ) -> io::Result { const BUFFER_SIZE: usize = 1024 * 1024; - let mut file = DmaFile::open(path.as_ref()).await?; + let mut file = DmaFile::open(path.as_ref(), true).await?; let buf = Vec::with_capacity_in( file.align_up(BUFFER_SIZE), DmaAllocator::new(file.alignment), @@ -379,7 +521,7 @@ pub async fn dma_read_file_range( path: impl AsRef, range: Range, ) -> io::Result<(DmaBuffer, Range)> { - let mut file = DmaFile::open(path.as_ref()).await?; + let mut file = DmaFile::open(path.as_ref(), true).await?; let align_start = file.align_down(range.start as usize); let align_end = file.align_up(range.end as usize); @@ -421,28 +563,40 @@ pub async fn dma_read_file_range( mod tests { use super::*; + #[test] + fn test_alignment() { + let a = Alignment::new(4).unwrap(); + + assert_eq!(8, a.align_up(5)); + assert_eq!(4, a.align_down(5)); + assert_eq!(2, a.align_up_count(5)); + assert_eq!(1, a.align_down_count(5)); + } + #[tokio::test] async fn test_read_write() { let _ = std::fs::remove_file("test_file"); - run_test(0).await.unwrap(); - run_test(100).await.unwrap(); - run_test(200).await.unwrap(); + for dio in [true, false] { + run_test(0, dio).await.unwrap(); + run_test(100, dio).await.unwrap(); + run_test(200, dio).await.unwrap(); - run_test(4096 - 1).await.unwrap(); - run_test(4096).await.unwrap(); - run_test(4096 + 1).await.unwrap(); + run_test(4096 - 1, dio).await.unwrap(); + run_test(4096, dio).await.unwrap(); + run_test(4096 + 1, dio).await.unwrap(); - run_test(4096 * 2 - 1).await.unwrap(); - run_test(4096 * 2).await.unwrap(); - run_test(4096 * 2 + 1).await.unwrap(); + run_test(4096 * 2 - 1, dio).await.unwrap(); + run_test(4096 * 2, dio).await.unwrap(); + run_test(4096 * 2 + 1, dio).await.unwrap(); - run_test(1024 * 1024 * 3 - 1).await.unwrap(); - run_test(1024 * 1024 * 3).await.unwrap(); - run_test(1024 * 1024 * 3 + 1).await.unwrap(); + run_test(1024 * 1024 * 3 - 1, dio).await.unwrap(); + run_test(1024 * 1024 * 3, dio).await.unwrap(); + run_test(1024 * 1024 * 3 + 1, dio).await.unwrap(); + } } - async fn run_test(n: usize) -> io::Result<()> { + async fn run_test(n: usize, dio: bool) -> io::Result<()> { let filename = "test_file"; let want = (0..n).map(|i| (i % 256) as u8).collect::>(); @@ -457,6 +611,18 @@ mod tests { assert_eq!(length, want.len()); assert_eq!(got, want); + let file = DmaFile::open(filename, dio).await?; + let align = file.alignment; + drop(file); + + std::fs::remove_file(filename)?; + + let mut buf = DmaWriteBuf::new(align, align.as_usize()); + buf.write_all(&want)?; + let length = buf.into_file(filename, dio).await?; + + assert_eq!(length, want.len()); + let (buf, range) = dma_read_file_range(filename, 0..length as u64).await?; assert_eq!(&buf[range], &want); @@ -515,7 +681,7 @@ mod tests { let bufs = vec![IoSlice::new(&want)]; dma_write_file_vectored(filename, &bufs).await.unwrap(); - let mut file = DmaFile::open(filename).await.unwrap(); + let mut file = DmaFile::open(filename, true).await.unwrap(); let buf = Vec::with_capacity_in(file_size, DmaAllocator::new(file.alignment)); file.set_buffer(buf); diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 5ac11ea7a46c..72e96459220c 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -32,6 +32,9 @@ pub use dma::dma_buffer_as_vec; pub use dma::dma_read_file; pub use dma::dma_read_file_range; pub use dma::dma_write_file_vectored; +pub use dma::Alignment; +pub use dma::DmaAllocator; +pub use dma::DmaWriteBuf; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; pub use ordered_float::OrderedFloat; diff --git a/src/query/ast/src/ast/statements/dictionary.rs b/src/query/ast/src/ast/statements/dictionary.rs index 88508fb75c19..2ddf086c7e11 100644 --- a/src/query/ast/src/ast/statements/dictionary.rs +++ b/src/query/ast/src/ast/statements/dictionary.rs @@ -19,6 +19,7 @@ use std::fmt::Formatter; use derive_visitor::Drive; use derive_visitor::DriveMut; +use super::ShowLimit; use crate::ast::write_comma_separated_list; use crate::ast::write_dot_separated_list; use crate::ast::write_space_separated_string_map; @@ -123,3 +124,22 @@ impl Display for ShowCreateDictionaryStmt { ) } } + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct ShowDictionariesStmt { + pub database: Option, + pub limit: Option, +} + +impl Display for ShowDictionariesStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "SHOW DICTIONARIES")?; + if let Some(database) = &self.database { + write!(f, " FROM {database}")?; + } + if let Some(limit) = &self.limit { + write!(f, " {limit}")?; + } + Ok(()) + } +} diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index 0607f1682d76..945679be3a69 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -158,9 +158,7 @@ pub enum Statement { CreateDictionary(CreateDictionaryStmt), DropDictionary(DropDictionaryStmt), ShowCreateDictionary(ShowCreateDictionaryStmt), - ShowDictionaries { - show_options: Option, - }, + ShowDictionaries(ShowDictionariesStmt), // Columns ShowColumns(ShowColumnsStmt), @@ -613,12 +611,7 @@ impl Display for Statement { Statement::CreateDictionary(stmt) => write!(f, "{stmt}")?, Statement::DropDictionary(stmt) => write!(f, "{stmt}")?, Statement::ShowCreateDictionary(stmt) => write!(f, "{stmt}")?, - Statement::ShowDictionaries { show_options } => { - write!(f, "SHOW DICTIONARIES")?; - if let Some(show_options) = show_options { - write!(f, " {show_options}")?; - } - } + Statement::ShowDictionaries(stmt) => write!(f, "{stmt}")?, Statement::CreateView(stmt) => write!(f, "{stmt}")?, Statement::AlterView(stmt) => write!(f, "{stmt}")?, Statement::DropView(stmt) => write!(f, "{stmt}")?, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index f2e50528b5e9..8c547eba458e 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -957,9 +957,15 @@ pub fn statement_body(i: Input) -> IResult { ); let show_dictionaries = map( rule! { - SHOW ~ DICTIONARIES ~ #show_options? + SHOW ~ DICTIONARIES ~ ((FROM|IN) ~ #ident)? ~ #show_limit? + }, + |(_, _, db, limit)| { + let database = match db { + Some((_, d)) => Some(d), + _ => None, + }; + Statement::ShowDictionaries(ShowDictionariesStmt { database, limit }) }, - |(_, _, show_options)| Statement::ShowDictionaries { show_options }, ); let show_create_dictionary = map( rule! { diff --git a/src/query/expression/src/aggregate/aggregate_hashtable.rs b/src/query/expression/src/aggregate/aggregate_hashtable.rs index 00c48e83293e..07d403518e0b 100644 --- a/src/query/expression/src/aggregate/aggregate_hashtable.rs +++ b/src/query/expression/src/aggregate/aggregate_hashtable.rs @@ -49,6 +49,7 @@ pub struct AggregateHashTable { // use for append rows directly during deserialize pub direct_append: bool, pub config: HashTableConfig, + current_radix_bits: u64, entries: Vec, count: usize, @@ -585,6 +586,7 @@ impl AggregateHashTable { .iter() .map(|arena| arena.allocated_bytes()) .sum::() + + self.entries.len() * std::mem::size_of::() } } diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 820d5841d4a3..866f976644f3 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -240,6 +240,18 @@ impl DataBlock { self.columns().iter().map(|entry| entry.memory_size()).sum() } + pub fn consume_convert_to_full(self) -> Self { + if self + .columns() + .iter() + .all(|entry| entry.value.as_column().is_some()) + { + return self; + } + + self.convert_to_full() + } + pub fn convert_to_full(&self) -> Self { let columns = self .columns() diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 98379b1cd1f0..391d2893f596 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -101,7 +101,7 @@ impl DataBlock { let arrow_schema = table_schema_to_arrow_schema(table_schema); let mut arrays = Vec::with_capacity(self.columns().len()); for (entry, arrow_field) in self - .convert_to_full() + .consume_convert_to_full() .columns() .iter() .zip(arrow_schema.fields()) diff --git a/src/query/expression/src/utils/arrow.rs b/src/query/expression/src/utils/arrow.rs index a57a871c09a6..7d3f20dc0127 100644 --- a/src/query/expression/src/utils/arrow.rs +++ b/src/query/expression/src/utils/arrow.rs @@ -13,7 +13,11 @@ // limitations under the License. use std::io::Cursor; +use std::io::Read; +use std::io::Seek; +use std::io::Write; +use databend_common_arrow::arrow; use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; @@ -21,8 +25,9 @@ use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::datatypes::Schema; use databend_common_arrow::arrow::io::ipc::read::read_file_metadata; use databend_common_arrow::arrow::io::ipc::read::FileReader; +use databend_common_arrow::arrow::io::ipc::write::Compression; use databend_common_arrow::arrow::io::ipc::write::FileWriter; -use databend_common_arrow::arrow::io::ipc::write::WriteOptions as IpcWriteOptions; +use databend_common_arrow::arrow::io::ipc::write::WriteOptions; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -66,29 +71,31 @@ pub fn buffer_into_mut(mut buffer: Buffer) -> Vec { pub fn serialize_column(col: &Column) -> Vec { let mut buffer = Vec::new(); + write_column(col, &mut buffer).unwrap(); + buffer +} +pub fn write_column(col: &Column, w: &mut impl Write) -> arrow::error::Result<()> { let schema = Schema::from(vec![col.arrow_field()]); - let mut writer = FileWriter::new(&mut buffer, schema, None, IpcWriteOptions::default()); - writer.start().unwrap(); - writer - .write( - &databend_common_arrow::arrow::chunk::Chunk::new(vec![col.as_arrow()]), - None, - ) - .unwrap(); - writer.finish().unwrap(); - - buffer + let mut writer = FileWriter::new(w, schema, None, WriteOptions { + compression: Some(Compression::LZ4), + }); + writer.start()?; + writer.write(&arrow::chunk::Chunk::new(vec![col.as_arrow()]), None)?; + writer.finish() } pub fn deserialize_column(bytes: &[u8]) -> Result { let mut cursor = Cursor::new(bytes); + read_column(&mut cursor) +} - let metadata = read_file_metadata(&mut cursor)?; +pub fn read_column(r: &mut R) -> Result { + let metadata = read_file_metadata(r)?; let f = metadata.schema.fields[0].clone(); let data_field = DataField::try_from(&f)?; - let mut reader = FileReader::new(cursor, metadata, None, None); + let mut reader = FileReader::new(r, metadata, None, None); let col = reader .next() .ok_or_else(|| ErrorCode::Internal("expected one arrow array"))?? diff --git a/src/query/expression/tests/it/sort.rs b/src/query/expression/tests/it/sort.rs index 9c72d7d6ab28..1d1dbf626844 100644 --- a/src/query/expression/tests/it/sort.rs +++ b/src/query/expression/tests/it/sort.rs @@ -24,6 +24,7 @@ use databend_common_expression::FromData; use databend_common_expression::SortColumnDescription; use crate::common::new_block; +use crate::rand_block_for_all_types; #[test] fn test_block_sort() -> Result<()> { @@ -201,3 +202,52 @@ fn test_block_sort() -> Result<()> { Ok(()) } + +#[test] +fn sort_concat() { + // Sort(Sort A || Sort B) = Sort (A || B) + use databend_common_expression::DataBlock; + use itertools::Itertools; + use rand::seq::SliceRandom; + use rand::Rng; + + let mut rng = rand::thread_rng(); + let num_blocks = 100; + + for _i in 0..num_blocks { + let block_a = rand_block_for_all_types(rng.gen_range(0..100)); + let block_b = rand_block_for_all_types(rng.gen_range(0..100)); + + let mut sort_index: Vec = (0..block_a.num_columns()).collect(); + sort_index.shuffle(&mut rng); + + let sort_desc = sort_index + .iter() + .map(|i| SortColumnDescription { + offset: *i, + asc: rng.gen_bool(0.5), + nulls_first: rng.gen_bool(0.5), + is_nullable: rng.gen_bool(0.5), + }) + .collect_vec(); + + let concat_ab_0 = DataBlock::concat(&[block_a.clone(), block_b.clone()]).unwrap(); + + let sort_a = DataBlock::sort(&block_a, &sort_desc, None).unwrap(); + let sort_b = DataBlock::sort(&block_b, &sort_desc, None).unwrap(); + let concat_ab_1 = DataBlock::concat(&[sort_a, sort_b]).unwrap(); + + let block_1 = DataBlock::sort(&concat_ab_0, &sort_desc, None).unwrap(); + let block_2 = DataBlock::sort(&concat_ab_1, &sort_desc, None).unwrap(); + + assert_eq!(block_1.num_columns(), block_2.num_columns()); + assert_eq!(block_1.num_rows(), block_2.num_rows()); + + let columns_1 = block_1.columns(); + let columns_2 = block_2.columns(); + for idx in 0..columns_1.len() { + assert_eq!(columns_1[idx].data_type, columns_2[idx].data_type); + assert_eq!(columns_1[idx].value, columns_2[idx].value); + } + } +} diff --git a/src/query/functions/Cargo.toml b/src/query/functions/Cargo.toml index 16e6f6c6d2e4..615ff7f9d0d8 100644 --- a/src/query/functions/Cargo.toml +++ b/src/query/functions/Cargo.toml @@ -47,7 +47,6 @@ lexical-core = "0.8.5" libm = "0.2.6" match-template = { workspace = true } md-5 = "0.10.5" -multiversion = "0.7.4" naive-cityhash = "0.2.0" num-traits = "0.2.15" once_cell = { workspace = true } diff --git a/src/query/functions/src/aggregates/aggregate_min_max_any.rs b/src/query/functions/src/aggregates/aggregate_min_max_any.rs index 9efdc985f13d..6ab35d792d80 100644 --- a/src/query/functions/src/aggregates/aggregate_min_max_any.rs +++ b/src/query/functions/src/aggregates/aggregate_min_max_any.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use borsh::BorshDeserialize; use borsh::BorshSerialize; +use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::*; @@ -92,6 +93,36 @@ where Ok(()) } + fn add_batch( + &mut self, + other: T::Column, + validity: Option<&Bitmap>, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let column_len = T::column_len(&other); + if column_len == 0 { + return Ok(()); + } + + let column_iter = T::iter_column(&other); + if let Some(validity) = validity { + if validity.unset_bits() == column_len { + return Ok(()); + } + for (data, valid) in column_iter.zip(validity.iter()) { + if valid { + let _ = self.add(data, function_data); + } + } + } else { + let v = column_iter.reduce(|l, r| if !C::change_if(&l, &r) { l } else { r }); + if let Some(v) = v { + let _ = self.add(v, function_data); + } + } + Ok(()) + } + fn merge(&mut self, rhs: &Self) -> Result<()> { if let Some(v) = &rhs.value { self.add(T::to_scalar_ref(v), None)?; diff --git a/src/query/functions/src/aggregates/aggregate_sum.rs b/src/query/functions/src/aggregates/aggregate_sum.rs index 116ba6e46c46..355d8dfa8a41 100644 --- a/src/query/functions/src/aggregates/aggregate_sum.rs +++ b/src/query/functions/src/aggregates/aggregate_sum.rs @@ -15,6 +15,7 @@ use borsh::BorshDeserialize; use borsh::BorshSerialize; use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::decimal::*; @@ -80,21 +81,33 @@ where } } -#[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))] -fn sum_batch(other: T::Column) -> N::Scalar +// #[multiversion::multiversion(targets("x86_64+avx", "x86_64+sse"))] +#[inline] +pub fn sum_batch(inner: Buffer, validity: Option<&Bitmap>) -> TSum where - T: ValueType + Sync + Send, - N: ValueType, - T::Scalar: Number + AsPrimitive, - N::Scalar: Number + AsPrimitive + std::ops::AddAssign, - for<'a> T::ScalarRef<'a>: Number + AsPrimitive, + T: Number + AsPrimitive, + TSum: Number + std::ops::AddAssign, { - // use temp variable to hint the compiler to unroll the loop - let mut sum = N::Scalar::default(); - for value in T::iter_column(&other) { - sum += value.as_(); + match validity { + Some(v) if v.unset_bits() > 0 => { + let mut sum = TSum::default(); + inner.iter().zip(v.iter()).for_each(|(t, b)| { + if b { + sum += t.as_(); + } + }); + + sum + } + _ => { + let mut sum = TSum::default(); + inner.iter().for_each(|t| { + sum += t.as_(); + }); + + sum + } } - sum } impl UnaryState for NumberSumState @@ -117,9 +130,12 @@ where fn add_batch( &mut self, other: T::Column, + validity: Option<&Bitmap>, _function_data: Option<&dyn FunctionData>, ) -> Result<()> { - self.value += sum_batch::(other); + let col = T::upcast_column(other); + let buffer = NumberType::::try_downcast_column(&col).unwrap(); + self.value += sum_batch::(buffer, validity); Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_unary.rs b/src/query/functions/src/aggregates/aggregate_unary.rs index fa85cffcce0b..5ac0bc5a4d22 100644 --- a/src/query/functions/src/aggregates/aggregate_unary.rs +++ b/src/query/functions/src/aggregates/aggregate_unary.rs @@ -47,10 +47,22 @@ where fn add_batch( &mut self, other: T::Column, + validity: Option<&Bitmap>, function_data: Option<&dyn FunctionData>, ) -> Result<()> { - for value in T::iter_column(&other) { - self.add(value, function_data)?; + match validity { + Some(validity) => { + for (data, valid) in T::iter_column(&other).zip(validity.iter()) { + if valid { + self.add(data, function_data)?; + } + } + } + None => { + for value in T::iter_column(&other) { + self.add(value, function_data)?; + } + } } Ok(()) } @@ -206,18 +218,8 @@ where ) -> Result<()> { let column = T::try_downcast_column(&columns[0]).unwrap(); let state: &mut S = place.get::(); - match validity { - Some(bitmap) if bitmap.unset_bits() > 0 => { - let column_iter = T::iter_column(&column); - for (value, is_valid) in column_iter.zip(bitmap.iter()) { - if is_valid { - state.add(value, self.function_data.as_deref())?; - } - } - Ok(()) - } - _ => state.add_batch(column, self.function_data.as_deref()), - } + + state.add_batch(column, validity, self.function_data.as_deref()) } fn accumulate_row(&self, place: StateAddr, columns: InputColumns, row: usize) -> Result<()> { diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 5f0bd0831a48..a13b5364b33f 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -44,9 +44,11 @@ async-trait = { workspace = true } backoff = { version = "0.4.0", features = ["futures", "tokio"] } backon = "0.4" base64 = "0.21.0" +buf-list = "1.0.3" bumpalo = { workspace = true } byte-unit = "4.0.19" byteorder = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } config = { version = "0.13.4", features = [] } diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index 0ae6e34ee9dc..93702af95b02 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -35,6 +35,7 @@ use databend_common_storages_system::ConfigsTable; use databend_common_storages_system::ContributorsTable; use databend_common_storages_system::CreditsTable; use databend_common_storages_system::DatabasesTable; +use databend_common_storages_system::DictionariesTable; use databend_common_storages_system::EnginesTable; use databend_common_storages_system::FullStreamsTable; use databend_common_storages_system::FunctionsTable; @@ -144,6 +145,7 @@ impl SystemDatabase { ViewsTableWithoutHistory::create(sys_db_meta.next_table_id()), TemporaryTablesTable::create(sys_db_meta.next_table_id()), ProceduresTable::create(sys_db_meta.next_table_id()), + DictionariesTable::create(sys_db_meta.next_table_id()), ]; let disable_tables = Self::disable_system_tables(); diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 009e2a8154e4..0eaa433bc235 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -61,10 +61,11 @@ enum ObjectId { // some statements like `SELECT 1`, `SHOW USERS`, `SHOW ROLES`, `SHOW TABLES` will be // rewritten to the queries on the system tables, we need to skip the privilege check on // these tables. -const SYSTEM_TABLES_ALLOW_LIST: [&str; 19] = [ +const SYSTEM_TABLES_ALLOW_LIST: [&str; 20] = [ "catalogs", "columns", "databases", + "dictionaries", "tables", "views", "tables_with_history", @@ -709,7 +710,8 @@ impl AccessChecker for PrivilegeAccess { Some(RewriteKind::ShowDatabases) | Some(RewriteKind::ShowEngines) | Some(RewriteKind::ShowFunctions) - | Some(RewriteKind::ShowUserFunctions) => { + | Some(RewriteKind::ShowUserFunctions) + | Some(RewriteKind::ShowDictionaries(_)) => { return Ok(()); } | Some(RewriteKind::ShowTableFunctions) => { diff --git a/src/query/service/src/lib.rs b/src/query/service/src/lib.rs index b9d6d07fd677..d0ea62933783 100644 --- a/src/query/service/src/lib.rs +++ b/src/query/service/src/lib.rs @@ -16,6 +16,7 @@ #![allow(internal_features)] #![allow(clippy::useless_asref)] #![allow(clippy::uninlined_format_args)] +#![feature(iter_map_windows)] #![feature(hash_raw_entry)] #![feature(core_intrinsics)] #![feature(arbitrary_self_types)] diff --git a/src/query/service/src/pipelines/builders/builder_window.rs b/src/query/service/src/pipelines/builders/builder_window.rs index 0bddf1fb2aed..c62a09092cfb 100644 --- a/src/query/service/src/pipelines/builders/builder_window.rs +++ b/src/query/service/src/pipelines/builders/builder_window.rs @@ -26,6 +26,8 @@ use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_sql::executor::physical_plans::Window; use databend_common_sql::executor::physical_plans::WindowPartition; use databend_storages_common_cache::TempDirManager; +use opendal::services::Fs; +use opendal::Operator; use crate::pipelines::processors::transforms::FrameBound; use crate::pipelines::processors::transforms::TransformWindowPartitionCollect; @@ -34,6 +36,7 @@ use crate::pipelines::processors::transforms::WindowPartitionExchange; use crate::pipelines::processors::transforms::WindowSpillSettings; use crate::pipelines::processors::TransformWindow; use crate::pipelines::PipelineBuilder; +use crate::spillers::SpillerDiskConfig; impl PipelineBuilder { pub(crate) fn build_window(&mut self, window: &Window) -> Result<()> { @@ -173,7 +176,23 @@ impl PipelineBuilder { let disk_bytes_limit = settings.get_window_partition_spilling_to_disk_bytes_limit()?; let temp_dir_manager = TempDirManager::instance(); - let disk_spill = temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()); + + let enable_dio = settings.get_enable_dio()?; + let disk_spill = + match temp_dir_manager.get_disk_spill_dir(disk_bytes_limit, &self.ctx.get_id()) { + Some(temp_dir) if !enable_dio => { + let builder = Fs::default().root(temp_dir.path().to_str().unwrap()); + Some(SpillerDiskConfig { + temp_dir, + local_operator: Some(Operator::new(builder)?.finish()), + }) + } + Some(temp_dir) => Some(SpillerDiskConfig { + temp_dir, + local_operator: None, + }), + None => None, + }; let window_spill_settings = WindowSpillSettings::new(&settings, num_processors)?; let have_order_col = window_partition.after_exchange.unwrap_or(false); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index 28d6dea1eedd..cbd229d9b7ff 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Instant; use std::vec; use bumpalo::Bump; @@ -111,6 +112,10 @@ pub struct TransformPartialAggregate { hash_table: HashTable, probe_state: ProbeState, params: Arc, + start: Instant, + first_block_start: Option, + processed_bytes: usize, + processed_rows: usize, } impl TransformPartialAggregate { @@ -164,6 +169,10 @@ impl TransformPartialAggregate { hash_table, probe_state: ProbeState::default(), settings: AggregateSettings::try_from(ctx)?, + start: Instant::now(), + first_block_start: None, + processed_bytes: 0, + processed_rows: 0, }, )) } @@ -239,10 +248,16 @@ impl TransformPartialAggregate { .map(|index| index.is_agg) .unwrap_or_default(); - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); let group_columns = InputColumns::new_block_proxy(&self.params.group_columns, &block); let rows_num = block.num_rows(); + self.processed_bytes += block.memory_size(); + self.processed_rows += rows_num; + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + { match &mut self.hash_table { HashTable::MovedOut => unreachable!(), @@ -449,6 +464,26 @@ impl AccumulatingTransform for TransformPartialAggrega HashTable::AggregateHashTable(hashtable) => { let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); + + log::info!( + "Aggregated {} to {} rows in {} sec(real: {}). ({} rows/sec, {}/sec, {})", + self.processed_rows, + hashtable.payload.len(), + self.start.elapsed().as_secs_f64(), + if let Some(t) = &self.first_block_start { + t.elapsed().as_secs_f64() + } else { + self.start.elapsed().as_secs_f64() + }, + convert_number_size( + self.processed_rows as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size( + self.processed_bytes as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size(self.processed_bytes as f64), + ); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { if payload.len() != 0 { blocks.push(DataBlock::empty_with_meta( diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index e4062b84a70d..7878df9e5ef8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Instant; use std::vec; use bumpalo::Bump; @@ -107,6 +108,11 @@ pub struct TransformPartialGroupBy { probe_state: ProbeState, settings: GroupBySettings, params: Arc, + + start: Instant, + first_block_start: Option, + processed_rows: usize, + processed_bytes: usize, } impl TransformPartialGroupBy { @@ -142,6 +148,10 @@ impl TransformPartialGroupBy { probe_state: ProbeState::default(), params, settings: GroupBySettings::try_from(ctx)?, + start: Instant::now(), + first_block_start: None, + processed_bytes: 0, + processed_rows: 0, }, )) } @@ -151,12 +161,19 @@ impl AccumulatingTransform for TransformPartialGroupBy const NAME: &'static str = "TransformPartialGroupBy"; fn transform(&mut self, block: DataBlock) -> Result> { - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); + + let rows_num = block.num_rows(); + + self.processed_bytes += block.memory_size(); + self.processed_rows += rows_num; + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + let group_columns = InputColumns::new_block_proxy(&self.params.group_columns, &block); { - let rows_num = block.num_rows(); - match &mut self.hash_table { HashTable::MovedOut => unreachable!(), HashTable::HashTable(cell) => { @@ -305,6 +322,26 @@ impl AccumulatingTransform for TransformPartialGroupBy HashTable::AggregateHashTable(hashtable) => { let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); + + log::info!( + "Aggregated {} to {} rows in {} sec(real: {}). ({} rows/sec, {}/sec, {})", + self.processed_rows, + hashtable.payload.len(), + self.start.elapsed().as_secs_f64(), + if let Some(t) = &self.first_block_start { + t.elapsed().as_secs_f64() + } else { + self.start.elapsed().as_secs_f64() + }, + convert_number_size( + self.processed_rows as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size( + self.processed_bytes as f64 / self.start.elapsed().as_secs_f64() + ), + convert_byte_size(self.processed_bytes as f64), + ); + for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { if payload.len() != 0 { blocks.push(DataBlock::empty_with_meta( @@ -316,7 +353,6 @@ impl AccumulatingTransform for TransformPartialGroupBy )); } } - blocks } }) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs index de05ea7d2e31..1de36a979b1f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs @@ -15,9 +15,12 @@ use std::alloc::Layout; use std::borrow::BorrowMut; use std::sync::Arc; +use std::time::Instant; use std::vec; use bumpalo::Bump; +use databend_common_base::base::convert_byte_size; +use databend_common_base::base::convert_number_size; use databend_common_catalog::plan::AggIndexMeta; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -47,6 +50,11 @@ pub struct PartialSingleStateAggregator { places: Vec, arg_indices: Vec>, funcs: Vec, + + start: Instant, + first_block_start: Option, + rows: usize, + bytes: usize, } impl PartialSingleStateAggregator { @@ -76,6 +84,10 @@ impl PartialSingleStateAggregator { places, funcs: params.aggregate_functions.clone(), arg_indices: params.aggregate_functions_arguments.clone(), + start: Instant::now(), + first_block_start: None, + rows: 0, + bytes: 0, }) } } @@ -84,13 +96,17 @@ impl AccumulatingTransform for PartialSingleStateAggregator { const NAME: &'static str = "AggregatorPartialTransform"; fn transform(&mut self, block: DataBlock) -> Result> { + if self.first_block_start.is_none() { + self.first_block_start = Some(Instant::now()); + } + let is_agg_index_block = block .get_meta() .and_then(AggIndexMeta::downcast_ref_from) .map(|index| index.is_agg) .unwrap_or_default(); - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); for (idx, func) in self.funcs.iter().enumerate() { let place = self.places[idx]; @@ -107,6 +123,9 @@ impl AccumulatingTransform for PartialSingleStateAggregator { } } + self.rows += block.num_rows(); + self.bytes += block.memory_size(); + Ok(vec![]) } @@ -137,6 +156,20 @@ impl AccumulatingTransform for PartialSingleStateAggregator { } } + log::info!( + "Aggregated {} to 1 rows in {} sec (real: {}). ({} rows/sec, {}/sec, {})", + self.rows, + self.start.elapsed().as_secs_f64(), + if let Some(t) = &self.first_block_start { + t.elapsed().as_secs_f64() + } else { + self.start.elapsed().as_secs_f64() + }, + convert_number_size(self.rows as f64 / self.start.elapsed().as_secs_f64()), + convert_byte_size(self.bytes as f64 / self.start.elapsed().as_secs_f64()), + convert_byte_size(self.bytes as _), + ); + Ok(generate_data_block) } } @@ -195,7 +228,7 @@ impl AccumulatingTransform for FinalSingleStateAggregator { fn transform(&mut self, block: DataBlock) -> Result> { if !block.is_empty() { - let block = block.convert_to_full(); + let block = block.consume_convert_to_full(); for (index, _) in self.funcs.iter().enumerate() { let binary_array = block.get_by_offset(index).value.as_column().unwrap(); diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index fb5f9a649b35..c478a19f5ebf 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -352,7 +352,7 @@ impl Processor for TransformHashJoinProbe { { self.probe_hash_table(data_block)?; } else if let Some(data_block) = self.input_data_blocks.pop_front() { - let data_block = data_block.convert_to_full(); + let data_block = data_block.consume_convert_to_full(); self.probe_hash_table(data_block)?; } } diff --git a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs index f983b6208e65..e73dd63f6056 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs @@ -34,7 +34,6 @@ use databend_common_pipeline_transforms::processors::sort_merge; use databend_common_settings::Settings; use databend_common_storage::DataOperator; use databend_common_storages_fuse::TableContext; -use databend_storages_common_cache::TempDir; use super::WindowPartitionBuffer; use super::WindowPartitionMeta; @@ -42,6 +41,7 @@ use super::WindowSpillSettings; use crate::sessions::QueryContext; use crate::spillers::Spiller; use crate::spillers::SpillerConfig; +use crate::spillers::SpillerDiskConfig; use crate::spillers::SpillerType; #[derive(Debug, Clone, Copy)] @@ -99,7 +99,7 @@ impl TransformWindowPartitionCollect { num_processors: usize, num_partitions: usize, spill_settings: WindowSpillSettings, - disk_spill: Option>, + disk_spill: Option, sort_desc: Vec, schema: DataSchemaRef, have_order_col: bool, diff --git a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs index 21371abcec8e..a168b43cacb1 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs @@ -1005,7 +1005,7 @@ where T: Number + ResultTypeOfUnary let num_rows = data.num_rows(); if num_rows != 0 { self.blocks.push_back(WindowBlock { - block: data.convert_to_full(), + block: data.consume_convert_to_full(), builder: ColumnBuilder::with_capacity(&self.func.return_type()?, num_rows), }); } diff --git a/src/query/service/src/servers/mysql/writers/query_result_writer.rs b/src/query/service/src/servers/mysql/writers/query_result_writer.rs index 6beefcb0525e..d0251eefa50a 100644 --- a/src/query/service/src/servers/mysql/writers/query_result_writer.rs +++ b/src/query/service/src/servers/mysql/writers/query_result_writer.rs @@ -242,7 +242,7 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> { let mut buf = Vec::::new(); let columns = block - .convert_to_full() + .consume_convert_to_full() .columns() .iter() .map(|column| column.value.clone().into_column().unwrap()) diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index 31d420849041..f867e56749f9 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -17,8 +17,4 @@ mod spiller; pub use partition_buffer::PartitionBuffer; pub use partition_buffer::PartitionBufferFetchOption; -pub use spiller::Location; -pub use spiller::SpilledData; -pub use spiller::Spiller; -pub use spiller::SpillerConfig; -pub use spiller::SpillerType; +pub use spiller::*; diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index d13229b65df6..964b7ff4b04b 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -16,25 +16,30 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; -use std::io; use std::ops::Range; use std::sync::Arc; use std::time::Instant; +use buf_list::BufList; +use buf_list::Cursor; +use bytes::Buf; +use bytes::Bytes; use databend_common_base::base::dma_buffer_as_vec; use databend_common_base::base::dma_read_file_range; -use databend_common_base::base::dma_write_file_vectored; +use databend_common_base::base::Alignment; +use databend_common_base::base::DmaWriteBuf; use databend_common_base::base::GlobalUniqName; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_expression::arrow::deserialize_column; -use databend_common_expression::arrow::serialize_column; +use databend_common_expression::arrow::read_column; +use databend_common_expression::arrow::write_column; use databend_common_expression::DataBlock; use databend_storages_common_cache::TempDir; use databend_storages_common_cache::TempPath; +use opendal::Buffer; use opendal::Operator; use crate::sessions::QueryContext; @@ -65,10 +70,16 @@ impl Display for SpillerType { #[derive(Clone)] pub struct SpillerConfig { pub location_prefix: String, - pub disk_spill: Option>, + pub disk_spill: Option, pub spiller_type: SpillerType, } +#[derive(Clone)] +pub struct SpillerDiskConfig { + pub temp_dir: Arc, + pub local_operator: Option, +} + /// Spiller is a unified framework for operators which need to spill data from memory. /// It provides the following features: /// 1. Collection data that needs to be spilled. @@ -80,7 +91,9 @@ pub struct Spiller { ctx: Arc, operator: Operator, location_prefix: String, - disk_spill: Option>, + temp_dir: Option>, + // for dio disabled + local_operator: Option, _spiller_type: SpillerType, pub join_spilling_partition_bits: usize, /// 1 partition -> N partition files @@ -98,19 +111,29 @@ impl Spiller { operator: Operator, config: SpillerConfig, ) -> Result { - let join_spilling_partition_bits = ctx.get_settings().get_join_spilling_partition_bits()?; + let settings = ctx.get_settings(); let SpillerConfig { location_prefix, disk_spill, spiller_type, } = config; + + let (temp_dir, local_operator) = match disk_spill { + Some(SpillerDiskConfig { + temp_dir, + local_operator, + }) => (Some(temp_dir), local_operator), + None => (None, None), + }; + Ok(Self { ctx, operator, location_prefix, - disk_spill, + temp_dir, + local_operator, _spiller_type: spiller_type, - join_spilling_partition_bits, + join_spilling_partition_bits: settings.get_join_spilling_partition_bits()?, partition_location: Default::default(), columns_layout: Default::default(), partition_spilled_bytes: Default::default(), @@ -126,10 +149,16 @@ impl Spiller { let instant = Instant::now(); // Spill data to storage. - let encoded = EncodedBlock::from_block(data_block); - let columns_layout = encoded.columns_layout(); - let data_size = encoded.size(); - let location = self.write_encodes(data_size, vec![encoded]).await?; + let mut encoder = self.block_encoder(); + encoder.add_block(data_block); + let data_size = encoder.size(); + let BlocksEncoder { + buf, + mut columns_layout, + .. + } = encoder; + + let location = self.write_encodes(data_size, buf).await?; // Record statistics. match location { @@ -138,7 +167,8 @@ impl Spiller { } // Record columns layout for spilled data. - self.columns_layout.insert(location.clone(), columns_layout); + self.columns_layout + .insert(location.clone(), columns_layout.pop().unwrap()); Ok(location) } @@ -179,24 +209,35 @@ impl Spiller { partitioned_data: Vec<(usize, DataBlock)>, ) -> Result { // Serialize data block. - let mut write_bytes = 0; - let mut write_data = Vec::with_capacity(partitioned_data.len()); - let mut spilled_partitions = Vec::with_capacity(partitioned_data.len()); + let mut encoder = self.block_encoder(); + let mut partition_ids = Vec::new(); for (partition_id, data_block) in partitioned_data.into_iter() { - let begin = write_bytes; - - let encoded = EncodedBlock::from_block(data_block); - let columns_layout = encoded.columns_layout(); - let data_size = encoded.size(); - - write_bytes += data_size; - write_data.push(encoded); - spilled_partitions.push((partition_id, begin..write_bytes, columns_layout)); + partition_ids.push(partition_id); + encoder.add_block(data_block); } + let write_bytes = encoder.size(); + let BlocksEncoder { + buf, + offsets, + columns_layout, + .. + } = encoder; + + let partitions = partition_ids + .into_iter() + .zip( + offsets + .windows(2) + .map(|x| x[0]..x[1]) + .zip(columns_layout.into_iter()), + ) + .map(|(id, (range, layout))| (id, range, layout)) + .collect(); + // Spill data to storage. let instant = Instant::now(); - let location = self.write_encodes(write_bytes, write_data).await?; + let location = self.write_encodes(write_bytes, buf).await?; // Record statistics. match location { @@ -206,7 +247,7 @@ impl Spiller { Ok(SpilledData::MergedPartition { location, - partitions: spilled_partitions, + partitions, }) } @@ -217,22 +258,28 @@ impl Spiller { // Read spilled data from storage. let instant = Instant::now(); - let block = match location { - Location::Remote(loc) => { - let data = self.operator.read(loc).await?.to_bytes(); - record_remote_read_profile(&instant, data.len()); - deserialize_block(columns_layout, &data) - } - Location::Local(path) => { + let data = match (location, &self.local_operator) { + (Location::Local(path), None) => { + debug_assert_eq!(path.size(), columns_layout.iter().sum::()); let file_size = path.size(); - debug_assert_eq!(file_size, columns_layout.iter().sum::()); let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - let data = &buf[range]; - record_local_read_profile(&instant, data.len()); - deserialize_block(columns_layout, data) + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + (Location::Local(path), Some(ref local)) => { + debug_assert_eq!(path.size(), columns_layout.iter().sum::()); + local + .read(path.file_name().unwrap().to_str().unwrap()) + .await? } + (Location::Remote(loc), _) => self.operator.read(loc).await?, }; + match location { + Location::Remote(_) => record_remote_read_profile(&instant, data.len()), + Location::Local(_) => record_local_read_profile(&instant, data.len()), + } + + let block = deserialize_block(columns_layout, data); Ok(block) } @@ -266,9 +313,8 @@ impl Spiller { // Read spilled data from storage. let instant = Instant::now(); - let data = match location { - Location::Remote(loc) => self.operator.read(loc).await?.to_bytes(), - Location::Local(path) => { + let data = match (location, &self.local_operator) { + (Location::Local(path), None) => { let file_size = path.size(); debug_assert_eq!( file_size, @@ -279,12 +325,15 @@ impl Spiller { } ); - let (mut buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - assert_eq!(range.start, 0); - buf.truncate(range.end); - - dma_buffer_as_vec(buf).into() + let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + (Location::Local(path), Some(ref local)) => { + local + .read(path.file_name().unwrap().to_str().unwrap()) + .await? } + (Location::Remote(loc), _) => self.operator.read(loc).await?, }; // Record statistics. @@ -297,7 +346,7 @@ impl Spiller { let partitioned_data = partitions .iter() .map(|(partition_id, range, columns_layout)| { - let block = deserialize_block(columns_layout, &data[range.clone()]); + let block = deserialize_block(columns_layout, data.slice(range.clone())); (*partition_id, block) }) .collect(); @@ -317,28 +366,29 @@ impl Spiller { let instant = Instant::now(); let data_range = data_range.start as u64..data_range.end as u64; - match location { - Location::Remote(loc) => { - let data = self - .operator - .read_with(loc) + let data = match (location, &self.local_operator) { + (Location::Local(path), None) => { + let (buf, range) = dma_read_file_range(path, data_range).await?; + Buffer::from(dma_buffer_as_vec(buf)).slice(range) + } + (Location::Local(path), Some(ref local)) => { + local + .read_with(path.file_name().unwrap().to_str().unwrap()) .range(data_range) .await? - .to_bytes(); - record_remote_read_profile(&instant, data.len()); - Ok(deserialize_block(columns_layout, &data)) - } - Location::Local(path) => { - let (buf, range) = dma_read_file_range(path, data_range).await?; - let data = &buf[range]; - record_local_read_profile(&instant, data.len()); - Ok(deserialize_block(columns_layout, data)) } + (Location::Remote(loc), _) => self.operator.read_with(loc).range(data_range).await?, + }; + + match location { + Location::Remote(_) => record_remote_read_profile(&instant, data.len()), + Location::Local(_) => record_local_read_profile(&instant, data.len()), } + Ok(deserialize_block(columns_layout, data)) } - async fn write_encodes(&mut self, size: usize, blocks: Vec) -> Result { - let location = match &self.disk_spill { + async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result { + let location = match &self.temp_dir { None => None, Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), } @@ -348,37 +398,42 @@ impl Spiller { GlobalUniqName::unique(), ))); - let written = match &location { - Location::Remote(loc) => { - let mut writer = self - .operator - .writer_with(loc) - .chunk(8 * 1024 * 1024) - .await?; - - let mut written = 0; - for data in blocks.into_iter().flat_map(|x| x.0) { - written += data.len(); - writer.write(data).await?; - } - - writer.close().await?; - written + let mut writer = match (&location, &self.local_operator) { + (Location::Local(path), None) => { + let written = buf.into_file(path, true).await?; + debug_assert_eq!(size, written); + return Ok(location); } - Location::Local(path) => { - let bufs = blocks - .iter() - .flat_map(|x| &x.0) - .map(|data| io::IoSlice::new(data)) - .collect::>(); - - dma_write_file_vectored(path.as_ref(), &bufs).await? + (Location::Local(path), Some(local)) => { + local.writer_with(path.file_name().unwrap().to_str().unwrap()) } - }; + (Location::Remote(loc), _) => self.operator.writer_with(loc), + } + .chunk(8 * 1024 * 1024) + .await?; + + let buf = buf + .into_data() + .into_iter() + .map(|x| Bytes::from(dma_buffer_as_vec(x))) + .collect::(); + let written = buf.len(); + writer.write(buf).await?; + writer.close().await?; + debug_assert_eq!(size, written); Ok(location) } + fn block_encoder(&self) -> BlocksEncoder { + let align = self + .temp_dir + .as_ref() + .map(|dir| dir.block_alignment()) + .unwrap_or(Alignment::MIN); + BlocksEncoder::new(align, 8 * 1024 * 1024) + } + pub(crate) fn spilled_files(&self) -> Vec { self.columns_layout.keys().cloned().collect() } @@ -398,39 +453,50 @@ pub enum Location { Local(TempPath), } -pub struct EncodedBlock(pub Vec>); +struct BlocksEncoder { + buf: DmaWriteBuf, + offsets: Vec, + columns_layout: Vec>, +} + +impl BlocksEncoder { + fn new(align: Alignment, chunk: usize) -> Self { + Self { + buf: DmaWriteBuf::new(align, chunk), + offsets: vec![0], + columns_layout: Vec::new(), + } + } -impl EncodedBlock { - pub fn from_block(block: DataBlock) -> Self { - let data = block - .columns() - .iter() - .map(|entry| { + fn add_block(&mut self, block: DataBlock) { + let columns_layout = std::iter::once(self.size()) + .chain(block.columns().iter().map(|entry| { let column = entry .value .convert_to_full_column(&entry.data_type, block.num_rows()); - serialize_column(&column) - }) + write_column(&column, &mut self.buf).unwrap(); + self.size() + })) + .map_windows(|x: &[_; 2]| x[1] - x[0]) .collect(); - EncodedBlock(data) - } - pub fn columns_layout(&self) -> Vec { - self.0.iter().map(|data| data.len()).collect() + self.columns_layout.push(columns_layout); + self.offsets.push(self.size()) } - pub fn size(&self) -> usize { - self.0.iter().map(|data| data.len()).sum() + fn size(&self) -> usize { + self.buf.size() } } -pub fn deserialize_block(columns_layout: &[usize], mut data: &[u8]) -> DataBlock { +pub fn deserialize_block(columns_layout: &[usize], mut data: Buffer) -> DataBlock { let columns = columns_layout .iter() - .map(|layout| { - let (cur, remain) = data.split_at(*layout); - data = remain; - deserialize_column(cur).unwrap() + .map(|&layout| { + let ls = BufList::from_iter(data.slice(0..layout)); + data.advance(layout); + let mut cursor = Cursor::new(ls); + read_column(&mut cursor).unwrap() }) .collect::>(); diff --git a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs index 5527734d4cb9..a759e15d1eb0 100644 --- a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs +++ b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs @@ -71,8 +71,8 @@ fn expected_data_block( } fn check_data_block(expected: Vec, blocks: Vec) -> Result<()> { - let expected_data_block = DataBlock::concat(&expected)?.convert_to_full(); - let data_block = DataBlock::concat(&blocks)?.convert_to_full(); + let expected_data_block = DataBlock::concat(&expected)?.consume_convert_to_full(); + let data_block = DataBlock::concat(&blocks)?.consume_convert_to_full(); for (expected_column, column) in expected_data_block .columns() diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index 614f58070a07..df5879922802 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -15,6 +15,8 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'arguments' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'arguments' | 'system' | 'user_functions' | 'Variant' | 'VARIANT' | '' | '' | 'NO' | '' | | 'attempt_number' | 'system' | 'task_history' | 'Int32' | 'INT' | '' | '' | 'NO' | '' | +| 'attribute_names' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | +| 'attribute_types' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | | 'auth_type' | 'system' | 'users' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'auto_increment' | 'information_schema' | 'tables' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'byte_size' | 'system' | 'clustering_history' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | @@ -59,6 +61,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'command' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'information_schema' | 'statistics' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'comment' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'notifications' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | | 'comment' | 'system' | 'password_policies' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -82,6 +85,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'create_time' | 'information_schema' | 'tables' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'background_jobs' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'background_tasks' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | +| 'created_on' | 'system' | 'dictionaries' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'indexes' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'locks' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'notification_history' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | @@ -116,6 +120,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'data_write_bytes' | 'system' | 'processes' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'clustering_history' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'database' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'streams' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'database' | 'system' | 'streams_terse' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -231,6 +236,8 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'job_type' | 'system' | 'background_jobs' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | | 'join_spilled_bytes' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'join_spilled_rows' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | +| 'key_names' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | +| 'key_types' | 'system' | 'dictionaries' | 'Array(String)' | 'ARRAY(STRING)' | '' | '' | 'NO' | '' | | 'keywords' | 'information_schema' | 'keywords' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'kind' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'labels' | 'system' | 'metrics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -266,6 +273,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'name' | 'system' | 'contributors' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'credits' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'databases' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'name' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'functions' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'indexes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'malloc_stats_totals' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -386,6 +394,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'session_settings' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'size' | 'system' | 'caches' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'snapshot_location' | 'system' | 'streams' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | +| 'source' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'sql' | 'system' | 'query_cache' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'sql_path' | 'information_schema' | 'schemata' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'sql_user' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -465,6 +474,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'update_on' | 'system' | 'roles' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'update_on' | 'system' | 'users' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | | 'updated_on' | 'system' | 'background_tasks' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | +| 'updated_on' | 'system' | 'dictionaries' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'updated_on' | 'system' | 'indexes' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | | 'updated_on' | 'system' | 'password_policies' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | | 'updated_on' | 'system' | 'streams' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 50f9aeb27e5b..3600ee150d5a 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -273,6 +273,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_dio", DefaultSettingValue{ + value: UserSettingValue::UInt64(1), + desc: "Enables Direct IO.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("disable_join_reorder", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Disable join reorder optimization.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 55c2e46cd7e5..cb45b3d00f34 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -265,6 +265,10 @@ impl Settings { Ok(self.try_get_u64("enable_cbo")? != 0) } + pub fn get_enable_dio(&self) -> Result { + Ok(self.try_get_u64("enable_dio")? != 0) + } + /// # Safety pub unsafe fn get_disable_join_reorder(&self) -> Result { Ok(self.unchecked_try_get_u64("disable_join_reorder")? != 0) diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 03b90fd00bb2..b7023e6a4e2e 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -288,7 +288,7 @@ impl<'a> Binder { Statement::CreateDictionary(stmt) => self.bind_create_dictionary(stmt).await?, Statement::DropDictionary(stmt) => self.bind_drop_dictionary(stmt).await?, Statement::ShowCreateDictionary(stmt) => self.bind_show_create_dictionary(stmt).await?, - Statement::ShowDictionaries { show_options: _ } => todo!(), + Statement::ShowDictionaries(stmt) => self.bind_show_dictionaries(bind_context, stmt).await?, // Views Statement::CreateView(stmt) => self.bind_create_view(stmt).await?, Statement::AlterView(stmt) => self.bind_alter_view(stmt).await?, diff --git a/src/query/sql/src/planner/binder/ddl/dictionary.rs b/src/query/sql/src/planner/binder/ddl/dictionary.rs index 6b54019fd5ee..800b16ef5bff 100644 --- a/src/query/sql/src/planner/binder/ddl/dictionary.rs +++ b/src/query/sql/src/planner/binder/ddl/dictionary.rs @@ -19,6 +19,8 @@ use std::sync::LazyLock; use databend_common_ast::ast::CreateDictionaryStmt; use databend_common_ast::ast::DropDictionaryStmt; use databend_common_ast::ast::ShowCreateDictionaryStmt; +use databend_common_ast::ast::ShowDictionariesStmt; +use databend_common_ast::ast::ShowLimit; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; @@ -28,12 +30,16 @@ use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; use databend_common_meta_app::schema::DictionaryMeta; use itertools::Itertools; +use log::debug; use crate::plans::CreateDictionaryPlan; use crate::plans::DropDictionaryPlan; use crate::plans::Plan; +use crate::plans::RewriteKind; use crate::plans::ShowCreateDictionaryPlan; +use crate::BindContext; use crate::Binder; +use crate::SelectBuilder; pub const DICT_OPT_KEY_SQL_HOST: &str = "host"; pub const DICT_OPT_KEY_SQL_PORT: &str = "port"; @@ -383,4 +389,50 @@ impl Binder { }, ))) } + + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_show_dictionaries( + &mut self, + bind_context: &mut BindContext, + stmt: &ShowDictionariesStmt, + ) -> Result { + let ShowDictionariesStmt { database, limit } = stmt; + + let mut select_builder = SelectBuilder::from("system.dictionaries"); + + select_builder + .with_column("database AS Database") + .with_column("name AS Dictionary") + .with_column("key_names AS Key_Names") + .with_column("key_types AS key_Types") + .with_column("attribute_names AS Attribute_Names") + .with_column("attribute_types AS Attribute_Types") + .with_column("source AS Source") + .with_column("comment AS Comment"); + + select_builder + .with_order_by("database") + .with_order_by("name"); + + let database = self.check_database_exist(&None, database).await?; + select_builder.with_filter(format!("database = '{}'", database.clone())); + + match limit { + None => (), + Some(ShowLimit::Like { pattern }) => { + select_builder.with_filter(format!("name LIKE '{pattern}'")); + } + Some(ShowLimit::Where { selection }) => { + select_builder.with_filter(format!("({selection})")); + } + }; + let query = select_builder.build(); + debug!("show dictionaries rewrite to: {:?}", query); + self.bind_rewrite_to_query( + bind_context, + query.as_str(), + RewriteKind::ShowDictionaries(database.clone()), + ) + .await + } } diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 245fa278e83e..d2df531f5be7 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -390,6 +390,7 @@ pub enum RewriteKind { ShowColumns(String, String, String), ShowTablesStatus, ShowVirtualColumns, + ShowDictionaries(String), ShowStreams(String), diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index cea2467b2afe..e570836ec62a 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -1165,13 +1165,8 @@ impl<'a> TypeChecker<'a> { } self.in_window_function = false; - let frame = self.resolve_window_frame( - span, - &func, - &partitions, - &mut order_by, - spec.window_frame.clone(), - )?; + let frame = + self.resolve_window_frame(span, &func, &mut order_by, spec.window_frame.clone())?; let data_type = func.return_type(); let window_func = WindowFunc { span, @@ -1319,7 +1314,6 @@ impl<'a> TypeChecker<'a> { &mut self, span: Span, func: &WindowFuncType, - partition_by: &[ScalarExpr], order_by: &mut [WindowOrderBy], window_frame: Option, ) -> Result { @@ -1354,18 +1348,10 @@ impl<'a> TypeChecker<'a> { }); } WindowFuncType::Ntile(_) => { - return Ok(if partition_by.is_empty() { - WindowFuncFrame { - units: WindowFuncFrameUnits::Rows, - start_bound: WindowFuncFrameBound::Preceding(None), - end_bound: WindowFuncFrameBound::Following(None), - } - } else { - WindowFuncFrame { - units: WindowFuncFrameUnits::Rows, - start_bound: WindowFuncFrameBound::CurrentRow, - end_bound: WindowFuncFrameBound::CurrentRow, - } + return Ok(WindowFuncFrame { + units: WindowFuncFrameUnits::Rows, + start_bound: WindowFuncFrameBound::Preceding(None), + end_bound: WindowFuncFrameBound::Following(None), }); } WindowFuncType::CumeDist => { diff --git a/src/query/storages/common/cache/src/temp_dir.rs b/src/query/storages/common/cache/src/temp_dir.rs index 5493e9faf400..54f0465e0c94 100644 --- a/src/query/storages/common/cache/src/temp_dir.rs +++ b/src/query/storages/common/cache/src/temp_dir.rs @@ -30,6 +30,7 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::Once; +use databend_common_base::base::Alignment; use databend_common_base::base::GlobalInstance; use databend_common_base::base::GlobalUniqName; use databend_common_config::SpillConfig; @@ -44,14 +45,15 @@ pub struct TempDirManager { global_limit: usize, // Reserved disk space in blocks reserved: u64, + alignment: Alignment, group: Mutex, } impl TempDirManager { pub fn init(config: &SpillConfig, tenant_id: &str) -> Result<()> { - let (root, reserved) = if config.path.is_empty() { - (None, 0) + let (root, reserved, alignment) = if config.path.is_empty() { + (None, 0, Alignment::MIN) } else { let path = PathBuf::from(&config.path) .join(tenant_id) @@ -66,13 +68,18 @@ impl TempDirManager { } if create_dir_all(&path).is_err() { - (None, 0) + (None, 0, Alignment::MIN) } else { + let path = path.canonicalize()?.into_boxed_path(); + let stat = statvfs(path.as_ref()).map_err(|e| ErrorCode::StorageOther(e.to_string()))?; - let reserved = (stat.f_blocks as f64 * *config.reserved_disk_ratio) as u64; - (Some(path), reserved) + ( + Some(path), + (stat.f_blocks as f64 * *config.reserved_disk_ratio) as u64, + Alignment::new(stat.f_bsize.max(512) as usize).unwrap(), + ) } }; @@ -80,6 +87,7 @@ impl TempDirManager { root, global_limit: config.global_bytes_limit as usize, reserved, + alignment, group: Mutex::new(Group { dirs: HashMap::new(), }), @@ -175,10 +183,17 @@ impl TempDirManager { } } + pub fn block_alignment(&self) -> Alignment { + self.alignment + } + fn insufficient_disk(&self, size: u64) -> Result { let stat = statvfs(self.root.as_ref().unwrap().as_ref()) .map_err(|e| ErrorCode::Internal(e.to_string()))?; - Ok(stat.f_bavail < self.reserved + (size + stat.f_frsize - 1) / stat.f_frsize) + + debug_assert_eq!(stat.f_frsize, self.alignment.as_usize() as u64); + let n = self.alignment.align_up_count(size as usize) as u64; + Ok(stat.f_bavail < self.reserved + n) } } @@ -200,6 +215,8 @@ pub struct TempDir { } impl TempDir { + // It should be ensured that the actual size is less than or equal to + // the reserved size as much as possible, otherwise the limit may be exceeded. pub fn new_file_with_size(&self, size: usize) -> Result> { let path = self.path.join(GlobalUniqName::unique()).into_boxed_path(); @@ -241,6 +258,14 @@ impl TempDir { }); Ok(rt?) } + + pub fn block_alignment(&self) -> Alignment { + self.manager.alignment + } + + pub fn path(&self) -> &Path { + &self.path + } } struct DirInfo { @@ -306,6 +331,27 @@ impl TempPath { pub fn size(&self) -> usize { self.0.size } + + pub fn set_size(&mut self, size: usize) -> std::result::Result<(), &'static str> { + use std::cmp::Ordering; + + let Some(path) = Arc::get_mut(&mut self.0) else { + return Err("can't set size after share"); + }; + match size.cmp(&path.size) { + Ordering::Equal => {} + Ordering::Greater => { + let mut dir = path.dir_info.size.lock().unwrap(); + *dir += size - path.size; + } + Ordering::Less => { + let mut dir = path.dir_info.size.lock().unwrap(); + *dir -= path.size - size; + } + } + path.size = size; + Ok(()) + } } struct InnerPath { @@ -348,11 +394,12 @@ mod tests { let mgr = TempDirManager::instance(); let dir = mgr.get_disk_spill_dir(1 << 30, "some_query").unwrap(); - let path = dir.new_file_with_size(100)?.unwrap(); + let mut path = dir.new_file_with_size(110)?.unwrap(); println!("{:?}", &path); fs::write(&path, vec![b'a'; 100])?; + path.set_size(100).unwrap(); assert_eq!(1, dir.dir_info.count.load(Ordering::Relaxed)); assert_eq!(100, *dir.dir_info.size.lock().unwrap()); @@ -395,10 +442,13 @@ mod tests { deleted.sort(); + let pwd = std::env::current_dir()?.canonicalize()?; assert_eq!( vec![ - PathBuf::from("test_data2/test_tenant/unknown_query1").into_boxed_path(), - PathBuf::from("test_data2/test_tenant/unknown_query2").into_boxed_path(), + pwd.join("test_data2/test_tenant/unknown_query1") + .into_boxed_path(), + pwd.join("test_data2/test_tenant/unknown_query2") + .into_boxed_path(), ], deleted ); diff --git a/src/query/storages/system/src/dictionaries_table.rs b/src/query/storages/system/src/dictionaries_table.rs new file mode 100644 index 000000000000..9cc0c61fafcd --- /dev/null +++ b/src/query/storages/system/src/dictionaries_table.rs @@ -0,0 +1,200 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_expression::ScalarRef; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRefExt; +use databend_common_meta_app::schema::ListDictionaryReq; +use databend_common_meta_app::schema::TableIdent; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; + +use crate::table::AsyncOneBlockSystemTable; +use crate::table::AsyncSystemTable; + +pub struct DictionariesTable { + table_info: TableInfo, +} + +#[async_trait::async_trait] +impl AsyncSystemTable for DictionariesTable { + const NAME: &'static str = "system.dictionaries"; + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + #[async_backtrace::framed] + async fn get_full_data( + &self, + ctx: Arc, + _push_downs: Option, + ) -> Result { + let tenant = ctx.get_tenant(); + + let mut db_names = vec![]; + let mut names = vec![]; + + let mut key_names_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + let mut attribute_names_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + let mut key_types_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + let mut attribute_types_builder = + ColumnBuilder::with_capacity(&DataType::Array(Box::new(DataType::String)), 0); + + let mut sources = vec![]; + let mut comments = vec![]; + let mut created_ons = vec![]; + let mut updated_ons = vec![]; + + let catalog = ctx.get_default_catalog().unwrap(); + let databases = catalog.list_databases(&tenant).await?; + for database in databases { + let db_id = database.get_db_info().database_id.db_id; + let req = ListDictionaryReq { + tenant: tenant.clone(), + db_id, + }; + let dictionaries = catalog.list_dictionaries(req).await?; + for (dict_name, dict_meta) in dictionaries { + db_names.push(database.get_db_name().to_string()); + + names.push(dict_name.clone()); + + let comment = dict_meta.comment; + comments.push(comment); + + let created_on = dict_meta.created_on.timestamp_micros(); + created_ons.push(created_on); + let updated_on = match dict_meta.updated_on { + Some(updated_on) => updated_on.timestamp_micros(), + None => created_on, + }; + updated_ons.push(updated_on); + + let schema = dict_meta.schema; + let fields = &schema.fields; + let primary_column_ids = dict_meta.primary_column_ids; + + let mut key_names = vec![]; + let mut attribute_names = vec![]; + let mut key_types = vec![]; + let mut attribute_types = vec![]; + + for field in fields { + if primary_column_ids.contains(&field.column_id) { + key_names.push(field.name.clone()); + key_types.push(field.data_type.sql_name()); + } else { + attribute_names.push(field.name.clone()); + attribute_types.push(field.data_type.sql_name()); + } + } + let key_names_column = ScalarRef::Array(StringType::from_data(key_names)); + key_names_builder.push(key_names_column); + let attribute_names_column = + ScalarRef::Array(StringType::from_data(attribute_names)); + attribute_names_builder.push(attribute_names_column); + let key_types_column = ScalarRef::Array(StringType::from_data(key_types)); + key_types_builder.push(key_types_column); + let attribute_types_column = + ScalarRef::Array(StringType::from_data(attribute_types)); + attribute_types_builder.push(attribute_types_column); + + let dict_source = dict_meta.source; + let mut options = dict_meta.options; + if let Some(password) = options.get_mut("password") { + *password = "[hidden]".to_string(); + } + let options_str: Vec = options + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect(); + let options_joined = options_str.join(" "); + let source = format!("{}({})", dict_source, options_joined); + sources.push(source); + } + } + return Ok(DataBlock::new_from_columns(vec![ + StringType::from_data(db_names), + StringType::from_data(names), + key_names_builder.build(), + key_types_builder.build(), + attribute_names_builder.build(), + attribute_types_builder.build(), + StringType::from_data(sources), + StringType::from_data(comments), + TimestampType::from_data(created_ons), + TimestampType::from_data(updated_ons), + ])); + } +} + +impl DictionariesTable { + pub fn create(table_id: u64) -> Arc { + let schema = TableSchemaRefExt::create(vec![ + TableField::new("database", TableDataType::String), + TableField::new("name", TableDataType::String), + TableField::new( + "key_names", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new( + "key_types", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new( + "attribute_names", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new( + "attribute_types", + TableDataType::Array(Box::new(TableDataType::String)), + ), + TableField::new("source", TableDataType::String), + TableField::new("comment", TableDataType::String), + TableField::new("created_on", TableDataType::Timestamp), + TableField::new("updated_on", TableDataType::Timestamp), + ]); + + let table_info = TableInfo { + desc: "'system'.'dictionaries'".to_string(), + name: "dictionaries".to_string(), + ident: TableIdent::new(table_id, 0), + meta: TableMeta { + schema, + engine: "SystemDictionaries".to_string(), + ..Default::default() + }, + ..Default::default() + }; + + AsyncOneBlockSystemTable::create(DictionariesTable { table_info }) + } +} diff --git a/src/query/storages/system/src/lib.rs b/src/query/storages/system/src/lib.rs index 8cbaee5ac4db..85c14c4045c3 100644 --- a/src/query/storages/system/src/lib.rs +++ b/src/query/storages/system/src/lib.rs @@ -33,6 +33,7 @@ mod configs_table; mod contributors_table; mod credits_table; mod databases_table; +mod dictionaries_table; mod engines_table; mod functions_table; mod indexes_table; @@ -80,6 +81,7 @@ pub use configs_table::ConfigsTable; pub use contributors_table::ContributorsTable; pub use credits_table::CreditsTable; pub use databases_table::DatabasesTable; +pub use dictionaries_table::DictionariesTable; pub use engines_table::EnginesTable; pub use functions_table::FunctionsTable; pub use indexes_table::IndexesTable; diff --git a/src/query/storages/system/src/temp_files_table.rs b/src/query/storages/system/src/temp_files_table.rs index 34ca6ddfa459..6c53b84a73d6 100644 --- a/src/query/storages/system/src/temp_files_table.rs +++ b/src/query/storages/system/src/temp_files_table.rs @@ -119,7 +119,7 @@ impl AsyncSystemTable for TempFilesTable { num_rows, ); - Ok(data_block.convert_to_full()) + Ok(data_block.consume_convert_to_full()) } } diff --git a/tests/sqllogictests/suites/base/06_show/06_0024_show_dictionaries.test b/tests/sqllogictests/suites/base/06_show/06_0024_show_dictionaries.test new file mode 100644 index 000000000000..914bc37fdba2 --- /dev/null +++ b/tests/sqllogictests/suites/base/06_show/06_0024_show_dictionaries.test @@ -0,0 +1,65 @@ +statement ok +CREATE OR REPLACE DICTIONARY d1(c1 int NOT NULL, c2 Varchar NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +statement ok +CREATE OR REPLACE DICTIONARY d2(a int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +query T +show dictionaries +---- +default d1 ['c1'] ['INT'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +default d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +statement ok +DROP DATABASE IF EXISTS show_dictionary + +statement ok +CREATE DATABASE show_dictionary + +statement ok +use show_dictionary + +statement ok +CREATE OR REPLACE DICTIONARY show_dictionary.d1(c1 VARCHAR NOT NULL, c2 VARCHAR NOT NULL) PRIMARY KEY c1 SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +statement ok +CREATE OR REPLACE DICTIONARY show_dictionary.d2(a int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +statement ok +CREATE OR REPLACE DICTIONARY show_dictionary.d3(`a` int NOT NULL, b int NOT NULL) PRIMARY KEY a SOURCE(mysql(host='localhost' port='3306' username='root' password='1234' db='db1' table='test_table')) + +query T +show dictionaries from show_dictionary +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +query T +show dictionaries from show_dictionary like 'd%' +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +query T +show dictionaries from show_dictionary WHERE name = 'd2' OR 1 = 1 +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +query T +show dictionaries from show_dictionary WHERE name = 'd2' AND 1 = 1 +---- +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) + +statement ok +show dictionaries WHERE name='d2' AND 1=0 + +query T +show dictionaries +---- +show_dictionary d1 ['c1'] ['VARCHAR'] ['c2'] ['VARCHAR'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d2 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) +show_dictionary d3 ['a'] ['INT'] ['b'] ['INT'] mysql(db=db1 host=localhost password=[hidden] port=3306 table=test_table username=root) (empty) diff --git a/tests/sqllogictests/suites/query/window_function/window_ntile.test b/tests/sqllogictests/suites/query/window_function/window_ntile.test index f00dd683bc3b..d9f467c3cd95 100644 --- a/tests/sqllogictests/suites/query/window_function/window_ntile.test +++ b/tests/sqllogictests/suites/query/window_function/window_ntile.test @@ -168,4 +168,88 @@ statement ok USE default statement ok -DROP DATABASE test_window_ntile \ No newline at end of file +DROP DATABASE test_window_ntile + +statement ok +CREATE or REPLACE TABLE t ( id INT NULL, kind VARCHAR NULL, per FLOAT NULL); + +statement ok +INSERT INTO t (id, kind, per) VALUES +(17, 'a', 10.0), (17, 'a', 20.5), (17, 'a', 30.25), (17, 'a', 40.75), (17, 'a', 50.0), +(17, 'a', 60.5), (17, 'a', 70.0), (17, 'a', 80.25), (17, 'a', 90.5), (17, 'a', 100.0), +(17, 'a', 110.75), (17, 'a', 120.5), (17, 'a', 130.0), (17, 'a', 140.25), (17, 'a', 150.5), +(17, 'a', 160.0), (17, 'a', 170.75), (17, 'a', 180.5), (17, 'a', 190.0), (17, 'a', 200.0), +(17, 'a', 210.5), (17, 'a', 220.25), (17, 'a', 230.0), (17, 'a', 240.75), (17, 'a', 250.5), +(17, 'a', 260.0), (17, 'a', 270.0), (17, 'a', 280.5), (17, 'a', 290.0), (17, 'a', 300.0), +(17, 'a', 310.5), (17, 'a', 320.0), (17, 'a', 330.25), (17, 'a', 340.0), (17, 'a', 350.5), +(17, 'a', 360.0), (17, 'a', 370.0), (17, 'a', 380.5), (17, 'a', 390.0), (17, 'a', 400.0), +(17, 'a', 410.75), (17, 'a', 420.5), (17, 'a', 430.0), (17, 'a', 440.25), (17, 'a', 450.5), +(17, 'a', 460.0), (17, 'a', 470.75), (17, 'a', 480.5), (17, 'a', 490.0), (17, 'a', 500.0), +(17, 'a', 510.5), (17, 'a', 520.25), (17, 'a', 530.0), (17, 'a', 540.5), (17, 'a', 550.0), +(17, 'a', 560.0), (17, 'a', 570.5), (17, 'a', 580.0), (17, 'a', 590.25), (17, 'a', 600.0), +(17, 'a', 610.5), (17, 'a', 620.0), (17, 'a', 630.0), (17, 'a', 640.5), (17, 'a', 650.0), +(17, 'a', 660.0), (17, 'a', 670.5), (17, 'a', 680.0), (17, 'a', 690.0), (17, 'a', 700.5), +(17, 'a', 710.0), (17, 'a', 720.0), (17, 'a', 730.5), (17, 'a', 740.0), (17, 'a', 750.25), +(17, 'a', 760.0), (17, 'a', 770.5), (17, 'a', 780.0), (17, 'a', 790.0), (17, 'a', 800.5), +(17, 'a', 810.0), (17, 'a', 820.0), (17, 'a', 830.5), (17, 'a', 840.0), (17, 'a', 850.0), +(17, 'a', 860.5), (17, 'a', 870.0), (17, 'a', 930.0), (17, 'a', 930.0), (17, 'a', 930.0), +(17, 'a', 930.0), (17, 'a', 930.0), (17, 'a', 930.0), (17, 'a', 1000.0), (17, 'a', 1000.0), +(17, 'a', 1000.0), (17, 'a', 1000.0), (17, 'a', 1000.0), (17, 'a', 1000.0), (17, 'a', 1000.0); + + +query TTTT +SELECT + COUNT(*), quantile, id, kind +FROM + ( + SELECT + id, kind, ntile(10) OVER ( ORDER BY per ASC ) AS quantile + FROM + (SELECT * FROM t) + ) +GROUP BY + quantile, id, kind +ORDER BY + quantile, id, kind; +---- +10 1 17 a +10 2 17 a +10 3 17 a +10 4 17 a +10 5 17 a +10 6 17 a +10 7 17 a +10 8 17 a +10 9 17 a +10 10 17 a + +query TTTT +SELECT + COUNT(*), quantile, id, kind +FROM + ( + SELECT + id, + kind, + ntile(10) OVER ( PARTITION BY id, kind ORDER BY per ASC ) AS quantile + FROM + (SELECT * FROM t) + ) +GROUP BY + quantile, id, kind +ORDER BY + quantile, id, kind; +---- +10 1 17 a +10 2 17 a +10 3 17 a +10 4 17 a +10 5 17 a +10 6 17 a +10 7 17 a +10 8 17 a +10 9 17 a +10 10 17 a + +statement ok +drop table if exists t; diff --git a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test index fccbd097486a..031a0e8778b0 100644 --- a/tests/sqllogictests/suites/query/window_function/window_partition_spill.test +++ b/tests/sqllogictests/suites/query/window_function/window_partition_spill.test @@ -10,6 +10,9 @@ set window_partition_spilling_bytes_threshold_per_proc = 1024 * 1024 * 1; statement ok set window_partition_spilling_to_disk_bytes_limit = 1024 * 1024 * 1024; +statement ok +set enable_dio = 1; + query T SELECT SUM(number + a + b) FROM ( @@ -35,6 +38,37 @@ FROM ( ---- 1499998499576 +statement ok +set enable_dio = 0; + +query T +SELECT SUM(number + a + b) +FROM ( + SELECT + number, + LEAD(number, 1, 0) OVER (PARTITION BY number % 16 ORDER BY number + 1) AS a, + LEAD(number, 2, 0) OVER (PARTITION BY number % 16 ORDER BY number + 1) AS b + FROM numbers(5000000) +); +---- +37499992499384 + +query I +SELECT SUM(a + b + c) +FROM ( + SELECT + number, + LEAD(number, 1, 0) OVER (PARTITION BY number % 8 ORDER BY number + 2) a, + LEAD(number, 2, 0) OVER (PARTITION BY number % 8 ORDER BY number + 2) b, + LEAD(number, 3, 0) OVER (PARTITION BY number % 8 ORDER BY number + 2) c + FROM numbers(1000000) +); +---- +1499998499576 + +statement ok +unset enable_dio; + statement ok DROP TABLE IF EXISTS customers;