Skip to content

Commit

Permalink
Merge branch 'databendlabs:main' into graphical
Browse files Browse the repository at this point in the history
  • Loading branch information
Maricaya authored Oct 17, 2024
2 parents fd30551 + a37384b commit f868302
Show file tree
Hide file tree
Showing 53 changed files with 1,078 additions and 783 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions src/common/base/src/base/dma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ struct DmaFile {
}

impl DmaFile {
async fn open_raw(path: impl AsRef<Path>, dio: bool) -> io::Result<File> {
async fn open_raw(path: impl AsRef<Path>, #[allow(unused)] dio: bool) -> io::Result<File> {
#[allow(unused_mut)]
let mut flags = 0;
#[cfg(target_os = "linux")]
if dio {
Expand All @@ -196,7 +197,8 @@ impl DmaFile {
.await
}

async fn create_raw(path: impl AsRef<Path>, dio: bool) -> io::Result<File> {
async fn create_raw(path: impl AsRef<Path>, #[allow(unused)] dio: bool) -> io::Result<File> {
#[allow(unused_mut)]
let mut flags = OFlags::EXCL;
#[cfg(target_os = "linux")]
if dio {
Expand Down
15 changes: 15 additions & 0 deletions src/common/base/src/base/watch_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl WatchNotify {
pub fn notify_waiters(&self) {
let _ = self.tx.send_replace(true);
}

pub fn notify_one(&self) {
self.notify_waiters()
}
}

#[cfg(test)]
Expand All @@ -68,4 +72,15 @@ mod tests {
let notified = notify.notified();
notified.await;
}

#[tokio::test]
async fn test_notify_one() {
let notify = WatchNotify::new();
// notify_waiters ahead of notified being instantiated and awaited
notify.notify_one();

// this should not await indefinitely
let notified = notify.notified();
notified.await;
}
}
3 changes: 3 additions & 0 deletions src/meta/types/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct NodeInfo {
pub secret: String,
pub cpu_nums: u64,
pub version: u32,
pub http_address: String,
pub flight_address: String,
pub discovery_address: String,
pub binary_version: String,
Expand All @@ -88,6 +89,7 @@ impl NodeInfo {
id: String,
secret: String,
cpu_nums: u64,
http_address: String,
flight_address: String,
discovery_address: String,
binary_version: String,
Expand All @@ -97,6 +99,7 @@ impl NodeInfo {
secret,
cpu_nums,
version: 0,
http_address,
flight_address,
discovery_address,
binary_version,
Expand Down
1 change: 1 addition & 0 deletions src/meta/types/tests/it/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fn test_node_info_ip_port() -> anyhow::Result<()> {
secret: "".to_string(),
cpu_nums: 1,
version: 1,
http_address: "7.8.9.10:987".to_string(),
flight_address: "1.2.3.4:123".to_string(),
discovery_address: "4.5.6.7:456".to_string(),
binary_version: "v0.8-binary-version".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/ast/src/ast/format/syntax/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ pub(crate) fn pretty_copy_into_location(copy_stmt: CopyIntoLocationStmt) -> RcDo
.append(
RcDoc::line()
.append(RcDoc::text("SINGLE = "))
.append(RcDoc::text(copy_stmt.single.to_string())),
.append(RcDoc::text(copy_stmt.options.single.to_string())),
)
}

Expand Down
96 changes: 62 additions & 34 deletions src/query/ast/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use crate::ast::Identifier;
use crate::ast::Lambda;
use crate::ast::SelectStageOptions;
use crate::ast::WindowDefinition;
use crate::ParseError;
use crate::Result;
use crate::Span;

/// Root node of a query tree
Expand Down Expand Up @@ -623,56 +625,82 @@ impl Display for TemporalClause {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Drive, DriveMut)]
pub enum SampleLevel {
ROW,
BLOCK,
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut)]
pub enum SampleConfig {
Probability(f64),
pub enum SampleRowLevel {
RowsNum(f64),
Probability(f64),
}

impl Eq for SampleConfig {}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Drive, DriveMut)]
pub struct Sample {
pub sample_level: SampleLevel,
pub sample_conf: SampleConfig,
}

impl Sample {
pub fn sample_probability(&self, stats_rows: Option<u64>) -> Option<f64> {
let rand = match &self.sample_conf {
SampleConfig::Probability(probability) => probability / 100.0,
SampleConfig::RowsNum(rows) => {
impl SampleRowLevel {
pub fn sample_probability(&self, stats_rows: Option<u64>) -> Result<Option<f64>> {
let rand = match &self {
SampleRowLevel::Probability(probability) => probability / 100.0,
SampleRowLevel::RowsNum(rows) => {
if let Some(row_num) = stats_rows {
if row_num > 0 {
rows / row_num as f64
} else {
return None;
return Ok(None);
}
} else {
return None;
return Ok(None);
}
}
};
Some(rand)
if rand > 1.0 {
return Err(ParseError(
None,
format!(
"Sample value should be less than or equal to 100, but got {}",
rand * 100.0
),
));
}
Ok(Some(rand))
}
}

impl Eq for SampleRowLevel {}

#[derive(
serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Default,
)]
pub struct SampleConfig {
pub row_level: Option<SampleRowLevel>,
pub block_level: Option<f64>,
}

impl SampleConfig {
pub fn set_row_level_sample(&mut self, value: f64, rows: bool) {
if rows {
self.row_level = Some(SampleRowLevel::RowsNum(value));
} else {
self.row_level = Some(SampleRowLevel::Probability(value));
}
}

pub fn set_block_level_sample(&mut self, probability: f64) {
self.block_level = Some(probability);
}
}

impl Display for Sample {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl Eq for SampleConfig {}

impl Display for SampleConfig {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "SAMPLE ")?;
match self.sample_level {
SampleLevel::ROW => write!(f, "ROW ")?,
SampleLevel::BLOCK => write!(f, "BLOCK ")?,
if let Some(block_level) = self.block_level {
write!(f, "BLOCK ({}) ", block_level)?;
}
match &self.sample_conf {
SampleConfig::Probability(prob) => write!(f, "({})", prob)?,
SampleConfig::RowsNum(rows) => write!(f, "({} ROWS)", rows)?,
if let Some(row_level) = &self.row_level {
match row_level {
SampleRowLevel::RowsNum(rows) => {
write!(f, "ROW ({} ROWS)", rows)?;
}
SampleRowLevel::Probability(probability) => {
write!(f, "ROW ({})", probability)?;
}
}
}
Ok(())
}
Expand All @@ -692,7 +720,7 @@ pub enum TableReference {
with_options: Option<WithOptions>,
pivot: Option<Box<Pivot>>,
unpivot: Option<Box<Unpivot>>,
sample: Option<Sample>,
sample: Option<SampleConfig>,
},
// `TABLE(expr)[ AS alias ]`
TableFunction {
Expand All @@ -703,7 +731,7 @@ pub enum TableReference {
params: Vec<Expr>,
named_params: Vec<(Identifier, Expr)>,
alias: Option<TableAlias>,
sample: Option<Sample>,
sample: Option<SampleConfig>,
},
// Derived table, which can be a subquery or joined tables or combination of them
Subquery {
Expand Down
48 changes: 39 additions & 9 deletions src/query/ast/src/ast/statements/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,29 @@ impl Display for CopyIntoTableStmt {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)]
pub struct CopyIntoLocationOptions {
pub single: bool,
pub max_file_size: usize,
pub detailed_output: bool,
pub use_raw_path: bool,
pub include_query_id: bool,
pub overwrite: bool,
}

impl Default for CopyIntoLocationOptions {
fn default() -> Self {
Self {
single: Default::default(),
max_file_size: Default::default(),
detailed_output: false,
use_raw_path: false,
include_query_id: true,
overwrite: false,
}
}
}

/// CopyIntoLocationStmt is the parsed statement of `COPY into <location> from <table> ...`
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct CopyIntoLocationStmt {
Expand All @@ -151,9 +174,7 @@ pub struct CopyIntoLocationStmt {
pub src: CopyIntoLocationSource,
pub dst: FileLocation,
pub file_format: FileFormatOptions,
pub single: bool,
pub max_file_size: usize,
pub detailed_output: bool,
pub options: CopyIntoLocationOptions,
}

impl Display for CopyIntoLocationStmt {
Expand All @@ -171,9 +192,12 @@ impl Display for CopyIntoLocationStmt {
if !self.file_format.is_empty() {
write!(f, " FILE_FORMAT = ({})", self.file_format)?;
}
write!(f, " SINGLE = {}", self.single)?;
write!(f, " MAX_FILE_SIZE = {}", self.max_file_size)?;
write!(f, " DETAILED_OUTPUT = {}", self.detailed_output)?;
write!(f, " SINGLE = {}", self.options.single)?;
write!(f, " MAX_FILE_SIZE = {}", self.options.max_file_size)?;
write!(f, " DETAILED_OUTPUT = {}", self.options.detailed_output)?;
write!(f, " INCLUDE_QUERY_ID = {}", self.options.include_query_id)?;
write!(f, " USE_RAW_PATH = {}", self.options.use_raw_path)?;
write!(f, " OVERWRITE = {}", self.options.overwrite)?;

Ok(())
}
Expand All @@ -183,9 +207,12 @@ impl CopyIntoLocationStmt {
pub fn apply_option(&mut self, opt: CopyIntoLocationOption) {
match opt {
CopyIntoLocationOption::FileFormat(v) => self.file_format = v,
CopyIntoLocationOption::Single(v) => self.single = v,
CopyIntoLocationOption::MaxFileSize(v) => self.max_file_size = v,
CopyIntoLocationOption::DetailedOutput(v) => self.detailed_output = v,
CopyIntoLocationOption::Single(v) => self.options.single = v,
CopyIntoLocationOption::MaxFileSize(v) => self.options.max_file_size = v,
CopyIntoLocationOption::DetailedOutput(v) => self.options.detailed_output = v,
CopyIntoLocationOption::IncludeQueryID(v) => self.options.include_query_id = v,
CopyIntoLocationOption::UseRawPath(v) => self.options.use_raw_path = v,
CopyIntoLocationOption::OverWrite(v) => self.options.overwrite = v,
}
}
}
Expand Down Expand Up @@ -482,7 +509,10 @@ pub enum CopyIntoLocationOption {
FileFormat(FileFormatOptions),
MaxFileSize(usize),
Single(bool),
IncludeQueryID(bool),
UseRawPath(bool),
DetailedOutput(bool),
OverWrite(bool),
}

#[derive(Clone, Debug, PartialEq, Eq, Default, Drive, DriveMut)]
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod replace;
mod script;
mod sequence;
mod set;
mod settings;
mod show;
mod stage;
mod statement;
Expand Down Expand Up @@ -85,6 +86,7 @@ pub use replace::*;
pub use script::*;
pub use sequence::*;
pub use set::*;
pub use settings::*;
pub use show::*;
pub use stage::*;
pub use statement::*;
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ pub enum SetType {
pub enum SetValues {
Expr(Vec<Box<Expr>>),
Query(Box<Query>),
// None means Unset Stmt
None,
}
Loading

0 comments on commit f868302

Please sign in to comment.